Source Code Cross Referenced for JChannel.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups;
0002:
0003:        import org.apache.commons.logging.Log;
0004:        import org.apache.commons.logging.LogFactory;
0005:        import org.jgroups.conf.ConfiguratorFactory;
0006:        import org.jgroups.conf.ProtocolStackConfigurator;
0007:        import org.jgroups.stack.ProtocolStack;
0008:        import org.jgroups.stack.StateTransferInfo;
0009:        import org.jgroups.stack.IpAddress;
0010:        import org.jgroups.util.*;
0011:        import org.w3c.dom.Element;
0012:
0013:        import java.io.File;
0014:        import java.io.InputStream;
0015:        import java.io.OutputStream;
0016:        import java.io.Serializable;
0017:        import java.net.URL;
0018:        import java.util.HashMap;
0019:        import java.util.Map;
0020:        import java.util.Vector;
0021:
0022:        /**
0023:         * JChannel is a pure Java implementation of Channel.
0024:         * When a JChannel object is instantiated it automatically sets up the
0025:         * protocol stack.
0026:         * <p>
0027:         * <B>Properties</B>
0028:         * <P>
0029:         * Properties are used to configure a channel, and are accepted in
0030:         * several forms; the String form is described here.
0031:         * A property string consists of a number of properties separated by
0032:         * colons.  For example:
0033:         * <p>
0034:         * <pre>"&lt;prop1&gt;(arg1=val1):&lt;prop2&gt;(arg1=val1;arg2=val2):&lt;prop3&gt;:&lt;propn&gt;"</pre>
0035:         * <p>
0036:         * Each property relates directly to a protocol layer, which is
0037:         * implemented as a Java class. When a protocol stack is to be created
0038:         * based on the above property string, the first property becomes the
0039:         * bottom-most layer, the second one will be placed on the first, etc.:
0040:         * the stack is created from the bottom to the top, as the string is
0041:         * parsed from left to right. Each property has to be the name of a
0042:         * Java class that resides in the
0043:         * {@link org.jgroups.protocols} package.
0044:         * <p>
0045:         * Note that only the base name has to be given, not the fully specified
0046:         * class name (e.g., UDP instead of org.jgroups.protocols.UDP).
0047:         * <p>
0048:         * Each layer may have 0 or more arguments, which are specified as a
0049:         * list of name/value pairs in parentheses directly after the property.
0050:         * In the example above, the first protocol layer has 1 argument,
0051:         * the second 2, the third none. When a layer is created, these
0052:         * properties (if there are any) will be set in a layer by invoking
0053:         * the layer's setProperties() method
0054:         * <p>
0055:         * As an example the property string below instructs JGroups to create
0056:         * a JChannel with protocols UDP, PING, FD and GMS:<p>
0057:         * <pre>"UDP(mcast_addr=228.10.9.8;mcast_port=5678):PING:FD:GMS"</pre>
0058:         * <p>
0059:         * The UDP protocol layer is at the bottom of the stack, and it
0060:         * should use mcast address 228.10.9.8. and port 5678 rather than
0061:         * the default IP multicast address and port. The only other argument
0062:         * instructs FD to output debug information while executing.
0063:         * Property UDP refers to a class {@link org.jgroups.protocols.UDP},
0064:         * which is subsequently loaded and an instance of which is created as protocol layer.
0065:         * If any of these classes are not found, an exception will be thrown and
0066:         * the construction of the stack will be aborted.
0067:         *
0068:         * @author Bela Ban
0069:         * @version $Id: JChannel.java,v 1.106.2.1 2006/12/04 22:47:16 vlada Exp $
0070:         */
0071:        public class JChannel extends Channel {
0072:
0073:            /**
0074:             * The default protocol stack used by the default constructor.
0075:             */
0076:            public static final String DEFAULT_PROTOCOL_STACK = "UDP(down_thread=false;mcast_send_buf_size=640000;mcast_port=45566;discard_incompatible_packets=true;"
0077:                    + "ucast_recv_buf_size=20000000;mcast_addr=228.10.10.10;up_thread=false;loopback=false;"
0078:                    + "mcast_recv_buf_size=25000000;max_bundle_size=64000;max_bundle_timeout=30;"
0079:                    + "use_incoming_packet_handler=true;use_outgoing_packet_handler=false;"
0080:                    + "ucast_send_buf_size=640000;tos=16;enable_bundling=true;ip_ttl=2):"
0081:                    + "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"
0082:                    + "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"
0083:                    + "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false):"
0084:                    + "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"
0085:                    + "pbcast.NAKACK(max_xmit_size=60000;down_thread=false;use_mcast_xmit=false;gc_lag=0;"
0086:                    + "discard_delivered_msgs=true;up_thread=false;retransmit_timeout=100,200,300,600,1200,2400,4800):"
0087:                    + "UNICAST(timeout=300,600,1200,2400,3600;down_thread=false;up_thread=false):"
0088:                    + "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=50000;max_bytes=400000;down_thread=false;"
0089:                    + "up_thread=false):"
0090:                    + "VIEW_SYNC(down_thread=false;avg_send_interval=60000;up_thread=false):"
0091:                    + "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;"
0092:                    + "join_retry_timeout=2000;up_thread=false;shun=true):"
0093:                    + "FC(max_credits=2000000;down_thread=false;up_thread=false;min_threshold=0.10):"
0094:                    + "FRAG2(frag_size=60000;down_thread=false;up_thread=false):"
0095:                    + "pbcast.STATE_TRANSFER(down_thread=false;up_thread=false)";
0096:
0097:            static final String FORCE_PROPS = "force.properties";
0098:
0099:            /* the protocol stack configuration string */
0100:            private String props = null;
0101:
0102:            /*the address of this JChannel instance*/
0103:            private Address local_addr = null;
0104:            /*the channel (also know as group) name*/
0105:            private String cluster_name = null; // group name
0106:            /*the latest view of the group membership*/
0107:            private View my_view = null;
0108:            /*the queue that is used to receive messages (events) from the protocol stack*/
0109:            private final Queue mq = new Queue();
0110:            /*the protocol stack, used to send and receive messages from the protocol stack*/
0111:            private ProtocolStack prot_stack = null;
0112:
0113:            /** Thread responsible for closing a channel and potentially reconnecting to it (e.g., when shunned). */
0114:            protected CloserThread closer = null;
0115:
0116:            /** To wait until a local address has been assigned */
0117:            private final Promise local_addr_promise = new Promise();
0118:
0119:            /** To wait until we have connected successfully */
0120:            private final Promise connect_promise = new Promise();
0121:
0122:            /** To wait until we have been disconnected from the channel */
0123:            private final Promise disconnect_promise = new Promise();
0124:
0125:            private final Promise state_promise = new Promise();
0126:
0127:            private final Promise flush_unblock_promise = new Promise();
0128:
0129:            private final Promise flush_promise = new Promise();
0130:
0131:            /** wait until we have a non-null local_addr */
0132:            private long LOCAL_ADDR_TIMEOUT = 30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));
0133:            /*if the states is fetched automatically, this is the default timeout, 5 secs*/
0134:            private static final long GET_STATE_DEFAULT_TIMEOUT = 5000;
0135:            /*if FLUSH is used channel waits for UNBLOCK event, this is the default timeout, 10 secs*/
0136:            private static final long FLUSH_UNBLOCK_TIMEOUT = 10000;
0137:            /*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/
0138:            private boolean receive_blocks = false;
0139:            /*flag to indicate whether to receive local messages
0140:             *if this is set to false, the JChannel will not receive messages sent by itself*/
0141:            private boolean receive_local_msgs = true;
0142:            /*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/
0143:            private boolean auto_reconnect = false;
0144:            /*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected
0145:             *setting this to true, automatically forces auto_reconnect to true*/
0146:            private boolean auto_getstate = false;
0147:            /*channel connected flag*/
0148:            protected boolean connected = false;
0149:
0150:            /*channel closed flag*/
0151:            protected boolean closed = false; // close() has been called, channel is unusable
0152:
0153:            /** True if a state transfer protocol is available, false otherwise */
0154:            private boolean state_transfer_supported = false; // set by CONFIG event from STATE_TRANSFER protocol
0155:
0156:            /** True if a flush protocol is available, false otherwise */
0157:            private volatile boolean flush_supported = false; // set by CONFIG event from FLUSH protocol
0158:
0159:            /** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove
0160:             * as soon as JGroups supports logical addresses
0161:             */
0162:            private byte[] additional_data = null;
0163:
0164:            protected final Log log = LogFactory.getLog(getClass());
0165:
0166:            /** Collect statistics */
0167:            protected boolean stats = true;
0168:
0169:            protected long sent_msgs = 0, received_msgs = 0, sent_bytes = 0,
0170:                    received_bytes = 0;
0171:
0172:            /** Used by subclass to create a JChannel without a protocol stack, don't use as application programmer */
0173:            protected JChannel(boolean no_op) {
0174:                ;
0175:            }
0176:
0177:            /**
0178:             * Constructs a <code>JChannel</code> instance with the protocol stack
0179:             * specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.
0180:             *
0181:             * @throws ChannelException if problems occur during the initialization of
0182:             *                          the protocol stack.
0183:             */
0184:            public JChannel() throws ChannelException {
0185:                this (DEFAULT_PROTOCOL_STACK);
0186:            }
0187:
0188:            /**
0189:             * Constructs a <code>JChannel</code> instance with the protocol stack
0190:             * configuration contained by the specified file.
0191:             *
0192:             * @param properties a file containing a JGroups XML protocol stack
0193:             *                   configuration.
0194:             *
0195:             * @throws ChannelException if problems occur during the configuration or
0196:             *                          initialization of the protocol stack.
0197:             */
0198:            public JChannel(File properties) throws ChannelException {
0199:                this (ConfiguratorFactory.getStackConfigurator(properties));
0200:            }
0201:
0202:            /**
0203:             * Constructs a <code>JChannel</code> instance with the protocol stack
0204:             * configuration contained by the specified XML element.
0205:             *
0206:             * @param properties a XML element containing a JGroups XML protocol stack
0207:             *                   configuration.
0208:             *
0209:             * @throws ChannelException if problems occur during the configuration or
0210:             *                          initialization of the protocol stack.
0211:             */
0212:            public JChannel(Element properties) throws ChannelException {
0213:                this (ConfiguratorFactory.getStackConfigurator(properties));
0214:            }
0215:
0216:            /**
0217:             * Constructs a <code>JChannel</code> instance with the protocol stack
0218:             * configuration indicated by the specified URL.
0219:             *
0220:             * @param properties a URL pointing to a JGroups XML protocol stack
0221:             *                   configuration.
0222:             *
0223:             * @throws ChannelException if problems occur during the configuration or
0224:             *                          initialization of the protocol stack.
0225:             */
0226:            public JChannel(URL properties) throws ChannelException {
0227:                this (ConfiguratorFactory.getStackConfigurator(properties));
0228:            }
0229:
0230:            /**
0231:             * Constructs a <code>JChannel</code> instance with the protocol stack
0232:             * configuration based upon the specified properties parameter.
0233:             *
0234:             * @param properties an old style property string, a string representing a
0235:             *                   system resource containing a JGroups XML configuration,
0236:             *                   a string representing a URL pointing to a JGroups XML
0237:             *                   XML configuration, or a string representing a file name
0238:             *                   that contains a JGroups XML configuration.
0239:             *
0240:             * @throws ChannelException if problems occur during the configuration and
0241:             *                          initialization of the protocol stack.
0242:             */
0243:            public JChannel(String properties) throws ChannelException {
0244:                this (ConfiguratorFactory.getStackConfigurator(properties));
0245:            }
0246:
0247:            /**
0248:             * Constructs a <code>JChannel</code> instance with the protocol stack
0249:             * configuration contained by the protocol stack configurator parameter.
0250:             * <p>
0251:             * All of the public constructors of this class eventually delegate to this
0252:             * method.
0253:             *
0254:             * @param configurator a protocol stack configurator containing a JGroups
0255:             *                     protocol stack configuration.
0256:             *
0257:             * @throws ChannelException if problems occur during the initialization of
0258:             *                          the protocol stack.
0259:             */
0260:            protected JChannel(ProtocolStackConfigurator configurator)
0261:                    throws ChannelException {
0262:                init(configurator);
0263:            }
0264:
0265:            /**
0266:             * Creates a new JChannel with the protocol stack as defined in the properties
0267:             * parameter. an example of this parameter is<BR>
0268:             * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>
0269:             * Other examples can be found in the ./conf directory<BR>
0270:             * @param properties the protocol stack setup; if null, the default protocol stack will be used.
0271:             * 					 The properties can also be a java.net.URL object or a string that is a URL spec.
0272:             *                   The JChannel will validate any URL object and String object to see if they are a URL.
0273:             *                   In case of the parameter being a url, the JChannel will try to load the xml from there.
0274:             *                   In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the
0275:             *                   DOM tree with the element as its root element.
0276:             * @deprecated Use the constructors with specific parameter types instead.
0277:             */
0278:            public JChannel(Object properties) throws ChannelException {
0279:                if (properties == null)
0280:                    properties = DEFAULT_PROTOCOL_STACK;
0281:
0282:                ProtocolStackConfigurator c = null;
0283:
0284:                try {
0285:                    c = ConfiguratorFactory.getStackConfigurator(properties);
0286:                } catch (Exception x) {
0287:                    throw new ChannelException("unable to load protocol stack",
0288:                            x);
0289:                }
0290:                init(c);
0291:            }
0292:
0293:            /**
0294:             * Returns the protocol stack.
0295:             * Currently used by Debugger.
0296:             * Specific to JChannel, therefore
0297:             * not visible in Channel
0298:             */
0299:            public ProtocolStack getProtocolStack() {
0300:                return prot_stack;
0301:            }
0302:
0303:            protected Log getLog() {
0304:                return log;
0305:            }
0306:
0307:            /**
0308:             * returns the protocol stack configuration in string format.
0309:             * an example of this property is<BR>
0310:             * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"
0311:             */
0312:            public String getProperties() {
0313:                return props;
0314:            }
0315:
0316:            public boolean statsEnabled() {
0317:                return stats;
0318:            }
0319:
0320:            public void enableStats(boolean stats) {
0321:                this .stats = stats;
0322:            }
0323:
0324:            public void resetStats() {
0325:                sent_msgs = received_msgs = sent_bytes = received_bytes = 0;
0326:            }
0327:
0328:            public long getSentMessages() {
0329:                return sent_msgs;
0330:            }
0331:
0332:            public long getSentBytes() {
0333:                return sent_bytes;
0334:            }
0335:
0336:            public long getReceivedMessages() {
0337:                return received_msgs;
0338:            }
0339:
0340:            public long getReceivedBytes() {
0341:                return received_bytes;
0342:            }
0343:
0344:            public int getNumberOfTasksInTimer() {
0345:                return prot_stack != null ? prot_stack.timer.size() : -1;
0346:            }
0347:
0348:            public String dumpTimerQueue() {
0349:                return prot_stack != null ? prot_stack.dumpTimerQueue()
0350:                        : "<n/a";
0351:            }
0352:
0353:            /**
0354:             * Returns a pretty-printed form of all the protocols. If include_properties is set,
0355:             * the properties for each protocol will also be printed.
0356:             */
0357:            public String printProtocolSpec(boolean include_properties) {
0358:                return prot_stack != null ? prot_stack
0359:                        .printProtocolSpec(include_properties) : null;
0360:            }
0361:
0362:            /**
0363:             * Connects the channel to a group.
0364:             * If the channel is already connected, an error message will be printed to the error log.
0365:             * If the channel is closed a ChannelClosed exception will be thrown.
0366:             * This method starts the protocol stack by calling ProtocolStack.start,
0367:             * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event.
0368:             * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified
0369:             * and the channel is considered connected.
0370:             *
0371:             * @param cluster_name A <code>String</code> denoting the group name. Cannot be null.
0372:             * @exception ChannelException The protocol stack cannot be started
0373:             * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
0374:             *                                   A new channel has to be created first.
0375:             */
0376:            public synchronized void connect(String cluster_name)
0377:                    throws ChannelException, ChannelClosedException {
0378:                /*make sure the channel is not closed*/
0379:                checkClosed();
0380:
0381:                /*if we already are connected, then ignore this*/
0382:                if (connected) {
0383:                    if (log.isTraceEnabled())
0384:                        log.trace("already connected to " + cluster_name);
0385:                    return;
0386:                }
0387:
0388:                /*make sure we have a valid channel name*/
0389:                if (cluster_name == null) {
0390:                    if (log.isInfoEnabled())
0391:                        log
0392:                                .info("cluster_name is null, assuming unicast channel");
0393:                } else
0394:                    this .cluster_name = cluster_name;
0395:
0396:                try {
0397:                    prot_stack.startStack(); // calls start() in all protocols, from top to bottom
0398:                } catch (Throwable e) {
0399:                    throw new ChannelException(
0400:                            "failed to start protocol stack", e);
0401:                }
0402:
0403:                String tmp = Util.getProperty(
0404:                        new String[] { Global.CHANNEL_LOCAL_ADDR_TIMEOUT,
0405:                                "local_addr.timeout" }, null, null, false,
0406:                        "30000");
0407:                LOCAL_ADDR_TIMEOUT = Long.parseLong(tmp);
0408:
0409:                /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
0410:                local_addr = (Address) local_addr_promise
0411:                        .getResult(LOCAL_ADDR_TIMEOUT);
0412:                if (local_addr == null) {
0413:                    log.fatal("local_addr is null; cannot connect");
0414:                    throw new ChannelException("local_addr is null");
0415:                }
0416:
0417:                /*create a temporary view, assume this channel is the only member and
0418:                 *is the coordinator*/
0419:                Vector t = new Vector(1);
0420:                t.addElement(local_addr);
0421:                my_view = new View(local_addr, 0, t); // create a dummy view
0422:
0423:                // only connect if we are not a unicast channel
0424:                if (cluster_name != null) {
0425:                    connect_promise.reset();
0426:
0427:                    if (flush_supported)
0428:                        flush_unblock_promise.reset();
0429:
0430:                    Event connect_event = new Event(Event.CONNECT, cluster_name);
0431:                    down(connect_event);
0432:
0433:                    Object res = connect_promise.getResult(); // waits forever until connected (or channel is closed)
0434:                    if (res != null && res instanceof  Exception) { // the JOIN was rejected by the coordinator
0435:                        throw new ChannelException("connect() failed",
0436:                                (Throwable) res);
0437:                    }
0438:
0439:                    //if FLUSH is used do not return from connect() until UNBLOCK event is received
0440:                    boolean singletonMember = my_view != null
0441:                            && my_view.size() == 1;
0442:                    boolean shouldWaitForUnblock = flush_supported
0443:                            && receive_blocks && !singletonMember
0444:                            && !flush_unblock_promise.hasResult();
0445:                    if (shouldWaitForUnblock) {
0446:                        try {
0447:                            flush_unblock_promise
0448:                                    .getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
0449:                        } catch (TimeoutException te) {
0450:                            if (log.isWarnEnabled())
0451:                                log
0452:                                        .warn("waiting on UNBLOCK after connect timed out");
0453:                        }
0454:                    }
0455:                }
0456:                connected = true;
0457:                notifyChannelConnected(this );
0458:            }
0459:
0460:            public synchronized boolean connect(String cluster_name,
0461:                    Address target, String state_id, long timeout)
0462:                    throws ChannelException {
0463:                throw new UnsupportedOperationException("not yet implemented");
0464:            }
0465:
0466:            /**
0467:             * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
0468:             * Otherwise the following actions happen in the listed order<BR>
0469:             * <ol>
0470:             * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>
0471:             * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>
0472:             * <li> Sends a STOP_QUEING event down the stack<BR>
0473:             * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>
0474:             * <li> Notifies the listener, if the listener is available<BR>
0475:             * </ol>
0476:             */
0477:            public synchronized void disconnect() {
0478:                if (closed)
0479:                    return;
0480:
0481:                if (connected) {
0482:
0483:                    if (cluster_name != null) {
0484:
0485:                        /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a
0486:                         *  DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a
0487:                         *  DISCONNECT_OK has been received, or until timeout has elapsed.
0488:                         */
0489:                        Event disconnect_event = new Event(Event.DISCONNECT,
0490:                                local_addr);
0491:                        disconnect_promise.reset();
0492:                        down(disconnect_event); // DISCONNECT is handled by each layer
0493:                        disconnect_promise.getResult(); // wait for DISCONNECT_OK
0494:                    }
0495:
0496:                    // Just in case we use the QUEUE protocol and it is still blocked...
0497:                    down(new Event(Event.STOP_QUEUEING));
0498:
0499:                    connected = false;
0500:                    try {
0501:                        prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom
0502:                    } catch (Exception e) {
0503:                        if (log.isErrorEnabled())
0504:                            log.error("exception: " + e);
0505:                    }
0506:                    notifyChannelDisconnected(this );
0507:                    init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
0508:                }
0509:            }
0510:
0511:            /**
0512:             * Destroys the channel.
0513:             * After this method has been called, the channel us unusable.<BR>
0514:             * This operation will disconnect the channel and close the channel receive queue immediately<BR>
0515:             */
0516:            public synchronized void close() {
0517:                _close(true, true); // by default disconnect before closing channel and close mq
0518:            }
0519:
0520:            /** Shuts down the channel without disconnecting */
0521:            public synchronized void shutdown() {
0522:                _close(false, true); // by default disconnect before closing channel and close mq
0523:            }
0524:
0525:            /**
0526:             * Opens the channel.
0527:             * This does the following actions:
0528:             * <ol>
0529:             * <li> Resets the receiver queue by calling Queue.reset
0530:             * <li> Sets up the protocol stack by calling ProtocolStack.setup
0531:             * <li> Sets the closed flag to false
0532:             * </ol>
0533:             */
0534:            public synchronized void open() throws ChannelException {
0535:                if (!closed)
0536:                    throw new ChannelException("channel is already open");
0537:
0538:                try {
0539:                    mq.reset();
0540:
0541:                    // new stack is created on open() - bela June 12 2003
0542:                    prot_stack = new ProtocolStack(this , props);
0543:                    prot_stack.setup();
0544:                    closed = false;
0545:                } catch (Exception e) {
0546:                    throw new ChannelException("failed to open channel", e);
0547:                }
0548:            }
0549:
0550:            /**
0551:             * returns true if the Open operation has been called successfully
0552:             */
0553:            public boolean isOpen() {
0554:                return !closed;
0555:            }
0556:
0557:            /**
0558:             * returns true if the Connect operation has been called successfully
0559:             */
0560:            public boolean isConnected() {
0561:                return connected;
0562:            }
0563:
0564:            public int getNumMessages() {
0565:                return mq != null ? mq.size() : -1;
0566:            }
0567:
0568:            public String dumpQueue() {
0569:                return Util.dumpQueue(mq);
0570:            }
0571:
0572:            /**
0573:             * Returns a map of statistics of the various protocols and of the channel itself.
0574:             * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is
0575:             * used for the channel itself") and the values are property maps.
0576:             */
0577:            public Map dumpStats() {
0578:                Map retval = prot_stack.dumpStats();
0579:                if (retval != null) {
0580:                    Map tmp = dumpChannelStats();
0581:                    if (tmp != null)
0582:                        retval.put("channel", tmp);
0583:                }
0584:                return retval;
0585:            }
0586:
0587:            private Map dumpChannelStats() {
0588:                Map retval = new HashMap();
0589:                retval.put("sent_msgs", new Long(sent_msgs));
0590:                retval.put("sent_bytes", new Long(sent_bytes));
0591:                retval.put("received_msgs", new Long(received_msgs));
0592:                retval.put("received_bytes", new Long(received_bytes));
0593:                return retval;
0594:            }
0595:
0596:            /**
0597:             * Sends a message through the protocol stack.
0598:             * Implements the Transport interface.
0599:             *
0600:             * @param msg the message to be sent through the protocol stack,
0601:             *        the destination of the message is specified inside the message itself
0602:             * @exception ChannelNotConnectedException
0603:             * @exception ChannelClosedException
0604:             */
0605:            public void send(Message msg) throws ChannelNotConnectedException,
0606:                    ChannelClosedException {
0607:                checkClosed();
0608:                checkNotConnected();
0609:                if (stats) {
0610:                    sent_msgs++;
0611:                    sent_bytes += msg.getLength();
0612:                }
0613:                if (msg == null)
0614:                    throw new NullPointerException("msg is null");
0615:                down(new Event(Event.MSG, msg));
0616:            }
0617:
0618:            /**
0619:             * creates a new message with the destination address, and the source address
0620:             * and the object as the message value
0621:             * @param dst - the destination address of the message, null for all members
0622:             * @param src - the source address of the message
0623:             * @param obj - the value of the message
0624:             * @exception ChannelNotConnectedException
0625:             * @exception ChannelClosedException
0626:             * @see JChannel#send
0627:             */
0628:            public void send(Address dst, Address src, Serializable obj)
0629:                    throws ChannelNotConnectedException, ChannelClosedException {
0630:                send(new Message(dst, src, obj));
0631:            }
0632:
0633:            /**
0634:             * Blocking receive method.
0635:             * This method returns the object that was first received by this JChannel and that has not been
0636:             * received before. After the object is received, it is removed from the receive queue.<BR>
0637:             * If you only want to inspect the object received without removing it from the queue call
0638:             * JChannel.peek<BR>
0639:             * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>
0640:             * By specifying a timeout of 0, the operation blocks forever, or until a message has been received.
0641:             * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever
0642:             * @exception TimeoutException if a timeout occured prior to a new message was received
0643:             * @exception ChannelNotConnectedException
0644:             * @exception ChannelClosedException
0645:             * @see JChannel#peek
0646:             */
0647:            public Object receive(long timeout)
0648:                    throws ChannelNotConnectedException,
0649:                    ChannelClosedException, TimeoutException {
0650:
0651:                checkClosed();
0652:                checkNotConnected();
0653:
0654:                try {
0655:                    Event evt = (timeout <= 0) ? (Event) mq.remove()
0656:                            : (Event) mq.remove(timeout);
0657:                    Object retval = getEvent(evt);
0658:                    evt = null;
0659:                    if (stats) {
0660:                        if (retval != null && retval instanceof  Message) {
0661:                            received_msgs++;
0662:                            received_bytes += ((Message) retval).getLength();
0663:                        }
0664:                    }
0665:                    return retval;
0666:                } catch (QueueClosedException queue_closed) {
0667:                    throw new ChannelClosedException();
0668:                } catch (TimeoutException t) {
0669:                    throw t;
0670:                } catch (Exception e) {
0671:                    if (log.isErrorEnabled())
0672:                        log.error("exception: " + e);
0673:                    return null;
0674:                }
0675:            }
0676:
0677:            /**
0678:             * Just peeks at the next message, view or block. Does <em>not</em> install
0679:             * new view if view is received<BR>
0680:             * Does the same thing as JChannel.receive but doesn't remove the object from the
0681:             * receiver queue
0682:             */
0683:            public Object peek(long timeout)
0684:                    throws ChannelNotConnectedException,
0685:                    ChannelClosedException, TimeoutException {
0686:
0687:                checkClosed();
0688:                checkNotConnected();
0689:
0690:                try {
0691:                    Event evt = (timeout <= 0) ? (Event) mq.peek() : (Event) mq
0692:                            .peek(timeout);
0693:                    Object retval = getEvent(evt);
0694:                    evt = null;
0695:                    return retval;
0696:                } catch (QueueClosedException queue_closed) {
0697:                    if (log.isErrorEnabled())
0698:                        log.error("exception: " + queue_closed);
0699:                    return null;
0700:                } catch (TimeoutException t) {
0701:                    return null;
0702:                } catch (Exception e) {
0703:                    if (log.isErrorEnabled())
0704:                        log.error("exception: " + e);
0705:                    return null;
0706:                }
0707:            }
0708:
0709:            /**
0710:             * Returns the current view.
0711:             * <BR>
0712:             * If the channel is not connected or if it is closed it will return null.
0713:             * <BR>
0714:             * @return returns the current group view, or null if the channel is closed or disconnected
0715:             */
0716:            public View getView() {
0717:                return closed || !connected ? null : my_view;
0718:            }
0719:
0720:            /**
0721:             * returns the local address of the channel
0722:             * returns null if the channel is closed
0723:             */
0724:            public Address getLocalAddress() {
0725:                return closed ? null : local_addr;
0726:            }
0727:
0728:            /**
0729:             * returns the name of the channel
0730:             * if the channel is not connected or if it is closed it will return null
0731:             * @deprecated Use {@link #getClusterName()} instead
0732:             */
0733:            public String getChannelName() {
0734:                return closed ? null : !connected ? null : cluster_name;
0735:            }
0736:
0737:            public String getClusterName() {
0738:                return cluster_name;
0739:            }
0740:
0741:            /**
0742:             * Sets a channel option.  The options can be one of the following:
0743:             * <UL>
0744:             * <LI>    Channel.BLOCK
0745:             * <LI>    Channel.LOCAL
0746:             * <LI>    Channel.AUTO_RECONNECT
0747:             * <LI>    Channel.AUTO_GETSTATE
0748:             * </UL>
0749:             * <P>
0750:             * There are certain dependencies between the options that you can set,
0751:             * I will try to describe them here.
0752:             * <P>
0753:             * Option: Channel.BLOCK<BR>
0754:             * Value:  java.lang.Boolean<BR>
0755:             * Result: set to true will set setOpt(VIEW, true) and the JChannel will receive BLOCKS and VIEW events<BR>
0756:             *<BR>
0757:             * Option: LOCAL<BR>
0758:             * Value:  java.lang.Boolean<BR>
0759:             * Result: set to true the JChannel will receive messages that it self sent out.<BR>
0760:             *<BR>
0761:             * Option: AUTO_RECONNECT<BR>
0762:             * Value:  java.lang.Boolean<BR>
0763:             * Result: set to true and the JChannel will try to reconnect when it is being closed<BR>
0764:             *<BR>
0765:             * Option: AUTO_GETSTATE<BR>
0766:             * Value:  java.lang.Boolean<BR>
0767:             * Result: set to true, the AUTO_RECONNECT will be set to true and the JChannel will try to get the state after a close and reconnect happens<BR>
0768:             * <BR>
0769:             *
0770:             * @param option the parameter option Channel.VIEW, Channel.SUSPECT, etc
0771:             * @param value the value to set for this option
0772:             *
0773:             */
0774:            public void setOpt(int option, Object value) {
0775:                if (closed) {
0776:                    if (log.isWarnEnabled())
0777:                        log.warn("channel is closed; option not set !");
0778:                    return;
0779:                }
0780:
0781:                switch (option) {
0782:                case VIEW:
0783:                    if (log.isWarnEnabled())
0784:                        log
0785:                                .warn("option VIEW has been deprecated (it is always true now); this option is ignored");
0786:                    break;
0787:                case SUSPECT:
0788:                    if (log.isWarnEnabled())
0789:                        log
0790:                                .warn("option SUSPECT has been deprecated (it is always true now); this option is ignored");
0791:                    break;
0792:                case BLOCK:
0793:                    if (value instanceof  Boolean)
0794:                        receive_blocks = ((Boolean) value).booleanValue();
0795:                    else if (log.isErrorEnabled())
0796:                        log.error("option " + Channel.option2String(option)
0797:                                + " (" + value + "): value has to be Boolean");
0798:                    break;
0799:
0800:                case GET_STATE_EVENTS:
0801:                    if (log.isWarnEnabled())
0802:                        log
0803:                                .warn("option GET_STATE_EVENTS has been deprecated (it is always true now); this option is ignored");
0804:                    break;
0805:
0806:                case LOCAL:
0807:                    if (value instanceof  Boolean)
0808:                        receive_local_msgs = ((Boolean) value).booleanValue();
0809:                    else if (log.isErrorEnabled())
0810:                        log.error("option " + Channel.option2String(option)
0811:                                + " (" + value + "): value has to be Boolean");
0812:                    break;
0813:
0814:                case AUTO_RECONNECT:
0815:                    if (value instanceof  Boolean)
0816:                        auto_reconnect = ((Boolean) value).booleanValue();
0817:                    else if (log.isErrorEnabled())
0818:                        log.error("option " + Channel.option2String(option)
0819:                                + " (" + value + "): value has to be Boolean");
0820:                    break;
0821:
0822:                case AUTO_GETSTATE:
0823:                    if (value instanceof  Boolean) {
0824:                        auto_getstate = ((Boolean) value).booleanValue();
0825:                        if (auto_getstate)
0826:                            auto_reconnect = true;
0827:                    } else if (log.isErrorEnabled())
0828:                        log.error("option " + Channel.option2String(option)
0829:                                + " (" + value + "): value has to be Boolean");
0830:                    break;
0831:
0832:                default:
0833:                    if (log.isErrorEnabled())
0834:                        log.error("option " + Channel.option2String(option)
0835:                                + " not known");
0836:                    break;
0837:                }
0838:            }
0839:
0840:            /**
0841:             * returns the value of an option.
0842:             * @param option the option you want to see the value for
0843:             * @return the object value, in most cases java.lang.Boolean
0844:             * @see JChannel#setOpt
0845:             */
0846:            public Object getOpt(int option) {
0847:                switch (option) {
0848:                case VIEW:
0849:                    return Boolean.TRUE;
0850:                case BLOCK:
0851:                    return receive_blocks ? Boolean.TRUE : Boolean.FALSE;
0852:                case SUSPECT:
0853:                    return Boolean.TRUE;
0854:                case AUTO_RECONNECT:
0855:                    return auto_reconnect ? Boolean.TRUE : Boolean.FALSE;
0856:                case AUTO_GETSTATE:
0857:                    return auto_getstate ? Boolean.TRUE : Boolean.FALSE;
0858:                case GET_STATE_EVENTS:
0859:                    return Boolean.TRUE;
0860:                case LOCAL:
0861:                    return receive_local_msgs ? Boolean.TRUE : Boolean.FALSE;
0862:                default:
0863:                    if (log.isErrorEnabled())
0864:                        log.error("option " + Channel.option2String(option)
0865:                                + " not known");
0866:                    return null;
0867:                }
0868:            }
0869:
0870:            /**
0871:             * Called to acknowledge a block() (callback in <code>MembershipListener</code> or
0872:             * <code>BlockEvent</code> received from call to <code>receive()</code>).
0873:             * After sending blockOk(), no messages should be sent until a new view has been received.
0874:             * Calling this method on a closed channel has no effect.
0875:             */
0876:            public void blockOk() {
0877:                down(new Event(Event.BLOCK_OK));
0878:                down(new Event(Event.START_QUEUEING));
0879:            }
0880:
0881:            /**
0882:             * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
0883:             * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
0884:             * milliseconds have elapsed. The argument of GET_STATE_OK should be a single object.
0885:             * @param target the target member to receive the state from. if null, state is retrieved from coordinator
0886:             * @param timeout the number of milliseconds to wait for the operation to complete successfully. 0 waits until
0887:             * the state has been received
0888:             * @return true of the state was received, false if the operation timed out
0889:             */
0890:            public boolean getState(Address target, long timeout)
0891:                    throws ChannelNotConnectedException, ChannelClosedException {
0892:                return getState(target, null, timeout);
0893:            }
0894:
0895:            /**
0896:             * Retrieves a substate (or partial state) from the target.
0897:             * @param target State provider. If null, coordinator is used
0898:             * @param state_id The ID of the substate. If null, the entire state will be transferred
0899:             * @param timeout the number of milliseconds to wait for the operation to complete successfully. 0 waits until
0900:             * the state has been received
0901:             * @return
0902:             * @throws ChannelNotConnectedException
0903:             * @throws ChannelClosedException
0904:             */
0905:            public boolean getState(Address target, String state_id,
0906:                    long timeout) throws ChannelNotConnectedException,
0907:                    ChannelClosedException {
0908:                if (target == null)
0909:                    target = determineCoordinator();
0910:                if (target != null && local_addr != null
0911:                        && target.equals(local_addr)) {
0912:                    if (log.isTraceEnabled())
0913:                        log.trace("cannot get state from myself (" + target
0914:                                + "): probably the first member");
0915:                    return false;
0916:                }
0917:
0918:                StateTransferInfo info = new StateTransferInfo(target,
0919:                        state_id, timeout);
0920:                boolean rc = _getState(new Event(Event.GET_STATE, info), info);
0921:                if (rc == false)
0922:                    down(new Event(Event.RESUME_STABLE));
0923:                return rc;
0924:            }
0925:
0926:            /**
0927:             * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
0928:             * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
0929:             * milliseconds have elapsed. The argument of GET_STATE_OK should be a vector of objects.
0930:             * @param targets - the target members to receive the state from ( an Address list )
0931:             * @param timeout - the number of milliseconds to wait for the operation to complete successfully
0932:             * @return true of the state was received, false if the operation timed out
0933:             * @deprecated Not really needed - we always want to get the state from a single member,
0934:             * use {@link #getState(org.jgroups.Address, long)} instead
0935:             */
0936:            public boolean getAllStates(Vector targets, long timeout)
0937:                    throws ChannelNotConnectedException, ChannelClosedException {
0938:                throw new UnsupportedOperationException(
0939:                        "use getState() instead");
0940:            }
0941:
0942:            /**
0943:             * Called by the application is response to receiving a <code>getState()</code> object when
0944:             * calling <code>receive()</code>.
0945:             * When the application receives a getState() message on the receive() method,
0946:             * it should call returnState() to reply with the state of the application
0947:             * @param state The state of the application as a byte buffer
0948:             *              (to send over the network).
0949:             */
0950:            public void returnState(byte[] state) {
0951:                StateTransferInfo info = new StateTransferInfo(null, null, 0L,
0952:                        state);
0953:                down(new Event(Event.GET_APPLSTATE_OK, info));
0954:            }
0955:
0956:            /**
0957:             * Returns a substate as indicated by state_id
0958:             * @param state
0959:             * @param state_id
0960:             */
0961:            public void returnState(byte[] state, String state_id) {
0962:                StateTransferInfo info = new StateTransferInfo(null, state_id,
0963:                        0L, state);
0964:                down(new Event(Event.GET_APPLSTATE_OK, info));
0965:            }
0966:
0967:            /**
0968:             * Callback method <BR>
0969:             * Called by the ProtocolStack when a message is received.
0970:             * It will be added to the message queue from which subsequent
0971:             * <code>Receive</code>s will dequeue it.
0972:             * @param evt the event carrying the message from the protocol stack
0973:             */
0974:            public void up(Event evt) {
0975:                int type = evt.getType();
0976:                Message msg;
0977:
0978:                switch (type) {
0979:
0980:                case Event.MSG:
0981:                    msg = (Message) evt.getArg();
0982:                    if (!receive_local_msgs) { // discard local messages (sent by myself to me)
0983:                        if (local_addr != null && msg.getSrc() != null)
0984:                            if (local_addr.equals(msg.getSrc()))
0985:                                return;
0986:                    }
0987:                    break;
0988:
0989:                case Event.VIEW_CHANGE:
0990:                    View tmp = (View) evt.getArg();
0991:                    if (tmp instanceof  MergeView)
0992:                        my_view = new View(tmp.getVid(), tmp.getMembers());
0993:                    else
0994:                        my_view = tmp;
0995:
0996:                    /*
0997:                     * Bela&Vladimir Oct 27th,2006 (JGroups 2.4)- we need to switch to 
0998:                     * connected=true because client can invoke channel.getView() in 
0999:                     * viewAccepted() callback invoked on this thread 
1000:                     * (see Event.VIEW_CHANGE handling below)
1001:                     * 
1002:                     * We do not set connect_promise because we want to wait for
1003:                     * CONNECT_OK and then return from user's JChannel.connect() call. 
1004:                     * This is important since we have to wait for Event.UNBLOCK after 
1005:                     * CONNECT_OK if blocks are turned on. See JChannel.connect() for 
1006:                     * details.
1007:                     *
1008:                     */
1009:                    if (connected == false) {
1010:                        connected = true;
1011:                    }
1012:
1013:                    // unblock queueing of messages due to previous BLOCK event:
1014:                    down(new Event(Event.STOP_QUEUEING));
1015:                    break;
1016:
1017:                case Event.CONFIG:
1018:                    HashMap config = (HashMap) evt.getArg();
1019:                    if (config != null) {
1020:                        if (config.containsKey("state_transfer")) {
1021:                            state_transfer_supported = ((Boolean) config
1022:                                    .get("state_transfer")).booleanValue();
1023:                        }
1024:                        if (config.containsKey("flush_supported")) {
1025:                            flush_supported = ((Boolean) config
1026:                                    .get("flush_supported")).booleanValue();
1027:                        }
1028:                    }
1029:                    break;
1030:
1031:                case Event.CONNECT_OK:
1032:                    connect_promise.setResult(evt.getArg());
1033:                    break;
1034:
1035:                case Event.SUSPEND_OK:
1036:                    flush_promise.setResult(Boolean.TRUE);
1037:                    break;
1038:
1039:                case Event.DISCONNECT_OK:
1040:                    disconnect_promise.setResult(Boolean.TRUE);
1041:                    break;
1042:
1043:                case Event.GET_STATE_OK:
1044:                    StateTransferInfo info = (StateTransferInfo) evt.getArg();
1045:                    byte[] state = info.state;
1046:
1047:                    state_promise.setResult(state != null ? Boolean.TRUE
1048:                            : Boolean.FALSE);
1049:                    if (up_handler != null) {
1050:                        up_handler.up(evt);
1051:                        return;
1052:                    }
1053:
1054:                    if (state != null) {
1055:                        String state_id = info.state_id;
1056:                        if (receiver != null) {
1057:                            if (receiver instanceof  ExtendedReceiver
1058:                                    && state_id != null)
1059:                                ((ExtendedReceiver) receiver).setState(
1060:                                        state_id, state);
1061:                            else
1062:                                receiver.setState(state);
1063:                        } else {
1064:                            try {
1065:                                mq.add(new Event(Event.STATE_RECEIVED, info));
1066:                            } catch (Exception e) {
1067:                            }
1068:                        }
1069:                    }
1070:                    break;
1071:
1072:                case Event.STATE_TRANSFER_INPUTSTREAM:
1073:                    StateTransferInfo sti = (StateTransferInfo) evt.getArg();
1074:                    InputStream is = sti.inputStream;
1075:                    //Oct 13,2006 moved to down() when Event.STATE_TRANSFER_INPUTSTREAM_CLOSED is received
1076:                    //state_promise.setResult(is != null? Boolean.TRUE : Boolean.FALSE);
1077:
1078:                    if (up_handler != null) {
1079:                        up_handler.up(evt);
1080:                        return;
1081:                    }
1082:
1083:                    if (is != null) {
1084:                        if (receiver instanceof  ExtendedReceiver) {
1085:                            if (sti.state_id == null)
1086:                                ((ExtendedReceiver) receiver).setState(is);
1087:                            else
1088:                                ((ExtendedReceiver) receiver).setState(
1089:                                        sti.state_id, is);
1090:                        } else {
1091:                            try {
1092:                                mq.add(new Event(
1093:                                        Event.STATE_TRANSFER_INPUTSTREAM, sti));
1094:                            } catch (Exception e) {
1095:                            }
1096:                        }
1097:                    }
1098:                    break;
1099:
1100:                case Event.SET_LOCAL_ADDRESS:
1101:                    local_addr_promise.setResult(evt.getArg());
1102:                    break;
1103:
1104:                case Event.EXIT:
1105:                    handleExit(evt);
1106:                    return; // no need to pass event up; already done in handleExit()
1107:
1108:                default:
1109:                    break;
1110:                }
1111:
1112:                // If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
1113:                if (up_handler != null) {
1114:                    up_handler.up(evt);
1115:
1116:                    if (type == Event.UNBLOCK) {
1117:                        flush_unblock_promise.setResult(Boolean.TRUE);
1118:                    }
1119:                    return;
1120:                }
1121:
1122:                switch (type) {
1123:                case Event.MSG:
1124:                    if (receiver != null) {
1125:                        receiver.receive((Message) evt.getArg());
1126:                        return;
1127:                    }
1128:                    break;
1129:                case Event.VIEW_CHANGE:
1130:                    if (receiver != null) {
1131:                        receiver.viewAccepted((View) evt.getArg());
1132:                        return;
1133:                    }
1134:                    break;
1135:                case Event.SUSPECT:
1136:                    if (receiver != null) {
1137:                        receiver.suspect((Address) evt.getArg());
1138:                        return;
1139:                    }
1140:                    break;
1141:                case Event.GET_APPLSTATE:
1142:                    if (receiver != null) {
1143:                        StateTransferInfo info = (StateTransferInfo) evt
1144:                                .getArg();
1145:                        byte[] tmp_state;
1146:                        String state_id = info.state_id;
1147:                        if (receiver instanceof  ExtendedReceiver
1148:                                && state_id != null) {
1149:                            tmp_state = ((ExtendedReceiver) receiver)
1150:                                    .getState(state_id);
1151:                        } else {
1152:                            tmp_state = receiver.getState();
1153:                        }
1154:                        returnState(tmp_state, state_id);
1155:                        return;
1156:                    }
1157:                    break;
1158:                case Event.STATE_TRANSFER_OUTPUTSTREAM:
1159:                    if (receiver != null) {
1160:                        StateTransferInfo sti = (StateTransferInfo) evt
1161:                                .getArg();
1162:                        OutputStream os = sti.outputStream;
1163:                        if (os != null && receiver instanceof  ExtendedReceiver) {
1164:                            if (sti.state_id == null)
1165:                                ((ExtendedReceiver) receiver).getState(os);
1166:                            else
1167:                                ((ExtendedReceiver) receiver).getState(
1168:                                        sti.state_id, os);
1169:                        }
1170:                        return;
1171:                    }
1172:                    break;
1173:
1174:                case Event.BLOCK:
1175:                    if (!receive_blocks) { // discard if client has not set 'receiving blocks' to 'on'
1176:                        down(new Event(Event.BLOCK_OK));
1177:                        down(new Event(Event.START_QUEUEING));
1178:                        return;
1179:                    }
1180:
1181:                    if (receiver != null) {
1182:                        try {
1183:                            receiver.block();
1184:                        } catch (Throwable t) {
1185:                            if (log.isErrorEnabled())
1186:                                log.error("failed calling block() on Receiver",
1187:                                        t);
1188:                        } finally {
1189:                            blockOk();
1190:                        }
1191:                        return;
1192:                    }
1193:                    break;
1194:                case Event.UNBLOCK:
1195:                    //discard if client has not set 'receiving blocks' to 'on'
1196:                    if (!receive_blocks) {
1197:                        return;
1198:                    }
1199:                    if (receiver instanceof  ExtendedReceiver) {
1200:                        try {
1201:                            ((ExtendedReceiver) receiver).unblock();
1202:                        } catch (Throwable t) {
1203:                            if (log.isErrorEnabled())
1204:                                log.error(
1205:                                        "failed calling unblock() on Receiver",
1206:                                        t);
1207:                        } finally {
1208:                            flush_unblock_promise.setResult(Boolean.TRUE);
1209:                        }
1210:                        return;
1211:                    }
1212:                    break;
1213:                default:
1214:                    break;
1215:                }
1216:
1217:                if (type == Event.MSG || type == Event.VIEW_CHANGE
1218:                        || type == Event.SUSPECT || type == Event.GET_APPLSTATE
1219:                        || type == Event.STATE_TRANSFER_OUTPUTSTREAM
1220:                        || type == Event.BLOCK || type == Event.UNBLOCK) {
1221:                    try {
1222:                        mq.add(evt);
1223:                    } catch (Exception e) {
1224:                        if (log.isErrorEnabled())
1225:                            log.error("exception adding event " + evt
1226:                                    + " to message queue", e);
1227:                    }
1228:                }
1229:            }
1230:
1231:            /**
1232:             * Sends a message through the protocol stack if the stack is available
1233:             * @param evt the message to send down, encapsulated in an event
1234:             */
1235:            public void down(Event evt) {
1236:                if (evt == null)
1237:                    return;
1238:
1239:                // handle setting of additional data (kludge, will be removed soon)
1240:                if (evt.getType() == Event.CONFIG) {
1241:                    try {
1242:                        Map m = (Map) evt.getArg();
1243:                        if (m != null && m.containsKey("additional_data")) {
1244:                            additional_data = (byte[]) m.get("additional_data");
1245:                            if (local_addr instanceof  IpAddress)
1246:                                ((IpAddress) local_addr)
1247:                                        .setAdditionalData(additional_data);
1248:                        }
1249:                    } catch (Throwable t) {
1250:                        if (log.isErrorEnabled())
1251:                            log
1252:                                    .error("CONFIG event did not contain a hashmap: "
1253:                                            + t);
1254:                    }
1255:                }
1256:
1257:                if (evt.getType() == Event.STATE_TRANSFER_INPUTSTREAM_CLOSED) {
1258:                    state_promise.setResult(Boolean.TRUE);
1259:                }
1260:
1261:                if (prot_stack != null)
1262:                    prot_stack.down(evt);
1263:                else if (log.isErrorEnabled())
1264:                    log.error("no protocol stack available");
1265:            }
1266:
1267:            public String toString(boolean details) {
1268:                StringBuffer sb = new StringBuffer();
1269:                sb.append("local_addr=").append(local_addr).append('\n');
1270:                sb.append("cluster_name=").append(cluster_name).append('\n');
1271:                sb.append("my_view=").append(my_view).append('\n');
1272:                sb.append("connected=").append(connected).append('\n');
1273:                sb.append("closed=").append(closed).append('\n');
1274:                if (mq != null)
1275:                    sb.append("incoming queue size=").append(mq.size()).append(
1276:                            '\n');
1277:                if (details) {
1278:                    sb.append("receive_blocks=").append(receive_blocks).append(
1279:                            '\n');
1280:                    sb.append("receive_local_msgs=").append(receive_local_msgs)
1281:                            .append('\n');
1282:                    sb.append("auto_reconnect=").append(auto_reconnect).append(
1283:                            '\n');
1284:                    sb.append("auto_getstate=").append(auto_getstate).append(
1285:                            '\n');
1286:                    sb.append("state_transfer_supported=").append(
1287:                            state_transfer_supported).append('\n');
1288:                    sb.append("props=").append(props).append('\n');
1289:                }
1290:
1291:                return sb.toString();
1292:            }
1293:
1294:            /* ----------------------------------- Private Methods ------------------------------------- */
1295:
1296:            protected final void init(ProtocolStackConfigurator configurator)
1297:                    throws ChannelException {
1298:                if (log.isInfoEnabled())
1299:                    log.info("JGroups version: " + Version.description);
1300:                ConfiguratorFactory.substituteVariables(configurator); // replace vars with system props
1301:                props = configurator.getProtocolStackString();
1302:                prot_stack = new ProtocolStack(this , props);
1303:                try {
1304:                    prot_stack.setup(); // Setup protocol stack (create layers, queues between them
1305:                } catch (Throwable e) {
1306:                    throw new ChannelException(
1307:                            "unable to setup the protocol stack", e);
1308:                }
1309:            }
1310:
1311:            /**
1312:             * Initializes all variables. Used after <tt>close()</tt> or <tt>disconnect()</tt>,
1313:             * to be ready for new <tt>connect()</tt>
1314:             */
1315:            private void init() {
1316:                local_addr = null;
1317:                cluster_name = null;
1318:                my_view = null;
1319:
1320:                // changed by Bela Sept 25 2003
1321:                //if(mq != null && mq.closed())
1322:                //  mq.reset();
1323:
1324:                connect_promise.reset();
1325:                disconnect_promise.reset();
1326:                connected = false;
1327:            }
1328:
1329:            /**
1330:             * health check.<BR>
1331:             * throws a ChannelNotConnected exception if the channel is not connected
1332:             */
1333:            protected void checkNotConnected()
1334:                    throws ChannelNotConnectedException {
1335:                if (!connected)
1336:                    throw new ChannelNotConnectedException();
1337:            }
1338:
1339:            /**
1340:             * health check<BR>
1341:             * throws a ChannelClosed exception if the channel is closed
1342:             */
1343:            protected void checkClosed() throws ChannelClosedException {
1344:                if (closed)
1345:                    throw new ChannelClosedException();
1346:            }
1347:
1348:            /**
1349:             * returns the value of the event<BR>
1350:             * These objects will be returned<BR>
1351:             * <PRE>
1352:             * <B>Event Type    - Return Type</B>
1353:             * Event.MSG           - returns a Message object
1354:             * Event.VIEW_CHANGE   - returns a View object
1355:             * Event.SUSPECT       - returns a SuspectEvent object
1356:             * Event.BLOCK         - returns a new BlockEvent object
1357:             * Event.GET_APPLSTATE - returns a GetStateEvent object
1358:             * Event.STATE_RECEIVED- returns a SetStateEvent object
1359:             * Event.Exit          - returns an ExitEvent object
1360:             * All other           - return the actual Event object
1361:             * </PRE>
1362:             * @param   evt - the event of which you want to extract the value
1363:             * @return the event value if it matches the select list,
1364:             *         returns null if the event is null
1365:             *         returns the event itself if a match (See above) can not be made of the event type
1366:             */
1367:            static Object getEvent(Event evt) {
1368:                if (evt == null)
1369:                    return null; // correct ?
1370:
1371:                switch (evt.getType()) {
1372:                case Event.MSG:
1373:                    return evt.getArg();
1374:                case Event.VIEW_CHANGE:
1375:                    return evt.getArg();
1376:                case Event.SUSPECT:
1377:                    return new SuspectEvent(evt.getArg());
1378:                case Event.BLOCK:
1379:                    return new BlockEvent();
1380:                case Event.UNBLOCK:
1381:                    return new UnblockEvent();
1382:                case Event.GET_APPLSTATE:
1383:                    StateTransferInfo info = (StateTransferInfo) evt.getArg();
1384:                    return new GetStateEvent(info.target, info.state_id);
1385:                case Event.STATE_RECEIVED:
1386:                    info = (StateTransferInfo) evt.getArg();
1387:                    return new SetStateEvent(info.state, info.state_id);
1388:                case Event.STATE_TRANSFER_OUTPUTSTREAM:
1389:                    info = (StateTransferInfo) evt.getArg();
1390:                    return new StreamingGetStateEvent(info.outputStream,
1391:                            info.state_id);
1392:                case Event.STATE_TRANSFER_INPUTSTREAM:
1393:                    info = (StateTransferInfo) evt.getArg();
1394:                    return new StreamingSetStateEvent(info.inputStream,
1395:                            info.state_id);
1396:                case Event.EXIT:
1397:                    return new ExitEvent();
1398:                default:
1399:                    return evt;
1400:                }
1401:            }
1402:
1403:            /**
1404:             * Receives the state from the group and modifies the JChannel.state object<br>
1405:             * This method initializes the local state variable to null, and then sends the state
1406:             * event down the stack. It waits for a GET_STATE_OK event to bounce back
1407:             * @param evt the get state event, has to be of type Event.GET_STATE
1408:             * @param info Information about the state transfer, e.g. target member and timeout
1409:             * @return true of the state was received, false if the operation timed out
1410:             */
1411:            private boolean _getState(Event evt, StateTransferInfo info)
1412:                    throws ChannelNotConnectedException, ChannelClosedException {
1413:                checkClosed();
1414:                checkNotConnected();
1415:                if (!state_transfer_supported) {
1416:                    throw new IllegalStateException(
1417:                            "fetching state will fail as state transfer is not supported. "
1418:                                    + "Add one of the STATE_TRANSFER protocols to your protocol configuration");
1419:                }
1420:
1421:                if (flush_supported)
1422:                    flush_unblock_promise.reset();
1423:
1424:                state_promise.reset();
1425:                down(evt);
1426:                Boolean state_transfer_successfull = (Boolean) state_promise
1427:                        .getResult(info.timeout);
1428:
1429:                //if FLUSH is used do not return from getState() until UNBLOCK event is received
1430:                boolean shouldWaitForUnblock = flush_supported
1431:                        && receive_blocks;
1432:                if (shouldWaitForUnblock) {
1433:                    try {
1434:                        flush_unblock_promise
1435:                                .getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
1436:                    } catch (TimeoutException te) {
1437:                        if (log.isWarnEnabled())
1438:                            log
1439:                                    .warn("Waiting on UNBLOCK after getState timed out");
1440:                    }
1441:                }
1442:
1443:                return state_transfer_successfull != null
1444:                        && state_transfer_successfull.booleanValue();
1445:            }
1446:
1447:            /**
1448:             * Disconnects and closes the channel.
1449:             * This method does the folloing things
1450:             * <ol>
1451:             * <li>Calls <code>this.disconnect</code> if the disconnect parameter is true
1452:             * <li>Calls <code>Queue.close</code> on mq if the close_mq parameter is true
1453:             * <li>Calls <code>ProtocolStack.stop</code> on the protocol stack
1454:             * <li>Calls <code>ProtocolStack.destroy</code> on the protocol stack
1455:             * <li>Sets the channel closed and channel connected flags to true and false
1456:             * <li>Notifies any channel listener of the channel close operation
1457:             * </ol>
1458:             */
1459:            protected void _close(boolean disconnect, boolean close_mq) {
1460:                if (closed)
1461:                    return;
1462:
1463:                if (disconnect)
1464:                    disconnect(); // leave group if connected
1465:
1466:                if (close_mq) {
1467:                    try {
1468:                        if (mq != null)
1469:                            mq.close(false); // closes and removes all messages
1470:                    } catch (Exception e) {
1471:                        if (log.isErrorEnabled())
1472:                            log.error("exception: " + e);
1473:                    }
1474:                }
1475:
1476:                if (prot_stack != null) {
1477:                    try {
1478:                        prot_stack.stopStack();
1479:                        prot_stack.destroy();
1480:                    } catch (Exception e) {
1481:                        if (log.isErrorEnabled())
1482:                            log
1483:                                    .error(
1484:                                            "failed destroying the protocol stack",
1485:                                            e);
1486:                    }
1487:                }
1488:                closed = true;
1489:                connected = false;
1490:                notifyChannelClosed(this );
1491:                init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
1492:            }
1493:
1494:            public final void closeMessageQueue(boolean flush_entries) {
1495:                if (mq != null)
1496:                    mq.close(flush_entries);
1497:            }
1498:
1499:            /**
1500:             * Creates a separate thread to close the protocol stack.
1501:             * This is needed because the thread that called JChannel.up() with the EXIT event would
1502:             * hang waiting for up() to return, while up() actually tries to kill that very thread.
1503:             * This way, we return immediately and allow the thread to terminate.
1504:             */
1505:            private void handleExit(Event evt) {
1506:                notifyChannelShunned();
1507:                if (closer != null && !closer.isAlive())
1508:                    closer = null;
1509:                if (closer == null) {
1510:                    if (log.isInfoEnabled())
1511:                        log
1512:                                .info("received an EXIT event, will leave the channel");
1513:                    closer = new CloserThread(evt);
1514:                    closer.start();
1515:                }
1516:            }
1517:
1518:            public boolean flushSupported() {
1519:                return flush_supported;
1520:            }
1521:
1522:            /**
1523:             * Will perform a flush of the system, ie. all pending messages are flushed out of the 
1524:             * system and all members ack their reception. After this call return, no member will 
1525:             * be sending any messages until {@link #stopFlush()} is called.
1526:             * <p>
1527:             * 
1528:             * In case of flush collisions random sleep time backoff algorithm is employed and 
1529:             * flush is reattempted for numberOfAttempts. Therefore this method is guaranteed 
1530:             * to return after timeout*numberOfAttempts miliseconds.
1531:             * 
1532:             * 
1533:             * @param timeout
1534:             * @param numberOfAttempts if flush was unsuccessful attempt again until numberOfAttempts is 0
1535:             * @param automatic_resume Call {@link #stopFlush()} after the flush
1536:             * @return true if FLUSH completed within the timeout
1537:             */
1538:            public boolean startFlush(long timeout, int numberOfAttempts,
1539:                    boolean automatic_resume) {
1540:                if (!flush_supported) {
1541:                    throw new IllegalStateException(
1542:                            "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
1543:                }
1544:
1545:                boolean successfulFlush = false;
1546:                flush_promise.reset();
1547:                down(new Event(Event.SUSPEND));
1548:                try {
1549:                    Boolean r = null;
1550:                    if (flush_promise.hasResult()) {
1551:                        r = (Boolean) flush_promise.getResult();
1552:                        successfulFlush = r.booleanValue();
1553:                    } else {
1554:                        r = (Boolean) flush_promise
1555:                                .getResultWithTimeout(timeout);
1556:                        successfulFlush = r.booleanValue();
1557:                    }
1558:                } catch (TimeoutException e) {
1559:                    //it is normal to get timeouts - it is the final outcome that counts 
1560:                    //we will just retry below
1561:
1562:                    if (log.isInfoEnabled())
1563:                        log
1564:                                .info("JChannel.startFlush requested by "
1565:                                        + local_addr
1566:                                        + " timed out waiting for flush responses after "
1567:                                        + timeout + " msec");
1568:                }
1569:
1570:                if (!successfulFlush && numberOfAttempts > 0) {
1571:                    long backOffSleepTime = Util.random(5000);
1572:                    if (log.isInfoEnabled())
1573:                        log.info("Flush in progress detected at " + local_addr
1574:                                + ". Backing off for " + backOffSleepTime
1575:                                + " ms. Attempts left " + numberOfAttempts);
1576:
1577:                    Util.sleepRandom(backOffSleepTime);
1578:                    successfulFlush = startFlush(timeout, --numberOfAttempts,
1579:                            automatic_resume);
1580:                }
1581:
1582:                if (automatic_resume)
1583:                    stopFlush();
1584:
1585:                return successfulFlush;
1586:            }
1587:
1588:            /**
1589:             * Will perform a flush of the system, ie. all pending messages are flushed out of the 
1590:             * system and all members ack their reception. After this call return, no member will 
1591:             * be sending any messages until {@link #stopFlush()} is called.
1592:             * <p>
1593:             * 
1594:             * In case of flush collisions random sleep time backoff algorithm is employed and 
1595:             * flush is reattempted for a default of three times. Therefore this method is guaranteed 
1596:             * to return after timeout*3 miliseconds.
1597:             *     
1598:             * @param timeout
1599:             * @param automatic_resume Call {@link #stopFlush()} after the flush
1600:             * @return true if FLUSH completed within the timeout
1601:             */
1602:            public boolean startFlush(long timeout, boolean automatic_resume) {
1603:                int defaultNumberOfFlushAttempts = 3;
1604:                return startFlush(timeout, defaultNumberOfFlushAttempts,
1605:                        automatic_resume);
1606:            }
1607:
1608:            public void stopFlush() {
1609:                if (!flush_supported) {
1610:                    throw new IllegalStateException(
1611:                            "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
1612:                }
1613:
1614:                flush_unblock_promise.reset();
1615:                down(new Event(Event.RESUME));
1616:
1617:                //do not return until UNBLOCK event is received        
1618:                boolean shouldWaitForUnblock = receive_blocks;
1619:                if (shouldWaitForUnblock) {
1620:                    try {
1621:                        flush_unblock_promise.getResultWithTimeout(5000);
1622:                    } catch (TimeoutException te) {
1623:                    }
1624:                }
1625:            }
1626:
1627:            Address determineCoordinator() {
1628:                Vector mbrs = my_view != null ? my_view.getMembers() : null;
1629:                if (mbrs == null)
1630:                    return null;
1631:                if (mbrs.size() > 0)
1632:                    return (Address) mbrs.firstElement();
1633:                return null;
1634:            }
1635:
1636:            /* ------------------------------- End of Private Methods ---------------------------------- */
1637:
1638:            class CloserThread extends Thread {
1639:                final Event evt;
1640:                final Thread t = null;
1641:
1642:                CloserThread(Event evt) {
1643:                    super (Util.getGlobalThreadGroup(), "CloserThread");
1644:                    this .evt = evt;
1645:                    setDaemon(true);
1646:                }
1647:
1648:                public void run() {
1649:                    try {
1650:                        String old_cluster_name = cluster_name; // remember because close() will null it
1651:                        if (log.isInfoEnabled())
1652:                            log.info("closing the channel");
1653:                        _close(false, false); // do not disconnect before closing channel, do not close mq (yet !)
1654:
1655:                        if (up_handler != null)
1656:                            up_handler.up(this .evt);
1657:                        else {
1658:                            try {
1659:                                if (receiver == null)
1660:                                    mq.add(this .evt);
1661:                            } catch (Exception ex) {
1662:                                if (log.isErrorEnabled())
1663:                                    log.error("exception: " + ex);
1664:                            }
1665:                        }
1666:
1667:                        if (mq != null) {
1668:                            Util.sleep(500); // give the mq thread a bit of time to deliver EXIT to the application
1669:                            try {
1670:                                mq.close(false);
1671:                            } catch (Exception ex) {
1672:                            }
1673:                        }
1674:
1675:                        if (auto_reconnect) {
1676:                            try {
1677:                                if (log.isInfoEnabled())
1678:                                    log.info("reconnecting to group "
1679:                                            + old_cluster_name);
1680:                                open();
1681:                            } catch (Exception ex) {
1682:                                if (log.isErrorEnabled())
1683:                                    log.error("failure reopening channel: "
1684:                                            + ex);
1685:                                return;
1686:                            }
1687:                            try {
1688:                                if (additional_data != null) {
1689:                                    // send previously set additional_data down the stack - other protocols (e.g. TP) use it
1690:                                    Map m = new HashMap(11);
1691:                                    m.put("additional_data", additional_data);
1692:                                    down(new Event(Event.CONFIG, m));
1693:                                }
1694:                                connect(old_cluster_name);
1695:                                notifyChannelReconnected(local_addr);
1696:                            } catch (Exception ex) {
1697:                                if (log.isErrorEnabled())
1698:                                    log
1699:                                            .error("failure reconnecting to channel: "
1700:                                                    + ex);
1701:                                return;
1702:                            }
1703:                        }
1704:
1705:                        if (auto_getstate) {
1706:                            if (log.isInfoEnabled())
1707:                                log
1708:                                        .info("fetching the state (auto_getstate=true)");
1709:                            boolean rc = JChannel.this .getState(null,
1710:                                    GET_STATE_DEFAULT_TIMEOUT);
1711:                            if (log.isInfoEnabled()) {
1712:                                if (rc)
1713:                                    log
1714:                                            .info("state was retrieved successfully");
1715:                                else
1716:                                    log.info("state transfer failed");
1717:                            }
1718:                        }
1719:
1720:                    } catch (Exception ex) {
1721:                        if (log.isErrorEnabled())
1722:                            log.error("exception: " + ex);
1723:                    } finally {
1724:                        closer = null;
1725:                    }
1726:                }
1727:            }
1728:
1729:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.