Source Code Cross Referenced for Protocol.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » stack » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.stack 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: Protocol.java,v 1.38.6.1 2007/04/27 08:03:57 belaban Exp $
002:
003:        package org.jgroups.stack;
004:
005:        import org.apache.commons.logging.Log;
006:        import org.apache.commons.logging.LogFactory;
007:        import org.jgroups.Event;
008:        import org.jgroups.util.Queue;
009:        import org.jgroups.util.QueueClosedException;
010:        import org.jgroups.util.Util;
011:
012:        import java.util.Map;
013:        import java.util.Properties;
014:        import java.util.Vector;
015:
016:        class UpHandler extends Thread {
017:            private Queue mq = null;
018:            private Protocol handler = null;
019:            private ProtocolObserver observer = null;
020:            protected final Log log = LogFactory.getLog(this .getClass());
021:
022:            public UpHandler(Queue mq, Protocol handler,
023:                    ProtocolObserver observer) {
024:                super (Util.getGlobalThreadGroup(), "UpHandler");
025:                this .mq = mq;
026:                this .handler = handler;
027:                this .observer = observer;
028:                if (handler != null)
029:                    setName("UpHandler (" + handler.getName() + ')');
030:                else
031:                    setName("UpHandler");
032:                setDaemon(true);
033:            }
034:
035:            public void setObserver(ProtocolObserver observer) {
036:                this .observer = observer;
037:            }
038:
039:            /** Removes events from mq and calls handler.up(evt) */
040:            public void run() {
041:                while (!mq.closed()) {
042:                    try {
043:                        Event evt = (Event) mq.remove();
044:                        if (evt == null) {
045:                            if (log.isWarnEnabled())
046:                                log.warn("removed null event");
047:                            continue;
048:                        }
049:
050:                        if (observer != null) { // call debugger hook (if installed)
051:                            if (observer.up(evt, mq.size()) == false) { // false means discard event
052:                                return;
053:                            }
054:                        }
055:                        handler.up(evt);
056:                    } catch (QueueClosedException queue_closed) {
057:                        break;
058:                    } catch (Throwable e) {
059:                        if (log.isErrorEnabled())
060:                            log.error(getName() + " caught exception", e);
061:                    }
062:                }
063:            }
064:
065:        }
066:
067:        class DownHandler extends Thread {
068:            private Queue mq = null;
069:            private Protocol handler = null;
070:            private ProtocolObserver observer = null;
071:            protected final Log log = LogFactory.getLog(this .getClass());
072:
073:            public DownHandler(Queue mq, Protocol handler,
074:                    ProtocolObserver observer) {
075:                super (Util.getGlobalThreadGroup(), "DownHandler");
076:                this .mq = mq;
077:                this .handler = handler;
078:                this .observer = observer;
079:                if (handler != null)
080:                    setName("DownHandler (" + handler.getName() + ')');
081:                else
082:                    setName("DownHandler");
083:                setDaemon(true);
084:            }
085:
086:            public void setObserver(ProtocolObserver observer) {
087:                this .observer = observer;
088:            }
089:
090:            /** Removes events from mq and calls handler.down(evt) */
091:            public void run() {
092:                while (!mq.closed()) {
093:                    try {
094:                        Event evt = (Event) mq.remove();
095:                        if (evt == null) {
096:                            if (log.isWarnEnabled())
097:                                log.warn("removed null event");
098:                            continue;
099:                        }
100:
101:                        if (observer != null) { // call debugger hook (if installed)
102:                            if (observer.down(evt, mq.size()) == false) { // false means discard event
103:                                continue;
104:                            }
105:                        }
106:
107:                        int type = evt.getType();
108:                        if (type == Event.START || type == Event.STOP) {
109:                            if (handler.handleSpecialDownEvent(evt) == false)
110:                                continue;
111:                        }
112:                        handler.down(evt);
113:                    } catch (QueueClosedException queue_closed) {
114:                        break;
115:                    } catch (Throwable e) {
116:                        if (log.isErrorEnabled())
117:                            log.error(getName() + " caught exception", e);
118:                    }
119:                }
120:            }
121:
122:        }
123:
124:        /**
125:         * The Protocol class provides a set of common services for protocol layers. Each layer has to
126:         * be a subclass of Protocol and override a number of methods (typically just <code>up()</code>,
127:         * <code>Down</code> and <code>getName</code>. Layers are stacked in a certain order to form
128:         * a protocol stack. <a href=org.jgroups.Event.html>Events</a> are passed from lower
129:         * layers to upper ones and vice versa. E.g. a Message received by the UDP layer at the bottom
130:         * will be passed to its higher layer as an Event. That layer will in turn pass the Event to
131:         * its layer and so on, until a layer handles the Message and sends a response or discards it,
132:         * the former resulting in another Event being passed down the stack.<p>
133:         * Each layer has 2 FIFO queues, one for up Events and one for down Events. When an Event is
134:         * received by a layer (calling the internal upcall <code>ReceiveUpEvent</code>), it is placed
135:         * in the up-queue where it will be retrieved by the up-handler thread which will invoke method
136:         * <code>Up</code> of the layer. The same applies for Events traveling down the stack. Handling
137:         * of the up-handler and down-handler threads and the 2 FIFO queues is donw by the Protocol
138:         * class, subclasses will almost never have to override this behavior.<p>
139:         * The important thing to bear in mind is that Events have to passed on between layers in FIFO
140:         * order which is guaranteed by the Protocol implementation and must be guranteed by subclasses
141:         * implementing their on Event queuing.<p>
142:         * <b>Note that each class implementing interface Protocol MUST provide an empty, public
143:         * constructor !</b>
144:         */
145:        public abstract class Protocol {
146:            protected final Properties props = new Properties();
147:            protected Protocol up_prot = null, down_prot = null;
148:            protected ProtocolStack stack = null;
149:            protected final Queue up_queue = new Queue();
150:            protected final Queue down_queue = new Queue();
151:            protected UpHandler up_handler = null;
152:            protected int up_thread_prio = -1;
153:            protected DownHandler down_handler = null;
154:            protected int down_thread_prio = -1;
155:            protected ProtocolObserver observer = null; // hook for debugger
156:            private final static long THREAD_JOIN_TIMEOUT = 1000;
157:            protected boolean down_thread = true; // determines whether the down_handler thread should be started
158:            protected boolean up_thread = true; // determines whether the up_handler thread should be started
159:            protected boolean stats = true; // determines whether to collect statistics (and expose them via JMX)
160:            protected final Log log = LogFactory.getLog(this .getClass());
161:
162:            /**
163:             * Configures the protocol initially. A configuration string consists of name=value
164:             * items, separated by a ';' (semicolon), e.g.:<pre>
165:             * "loopback=false;unicast_inport=4444"
166:             * </pre>
167:             */
168:            public boolean setProperties(Properties props) {
169:                if (props != null)
170:                    this .props.putAll(props);
171:                return true;
172:            }
173:
174:            /** Called by Configurator. Removes 2 properties which are used by the Protocol directly and then
175:             *	calls setProperties(), which might invoke the setProperties() method of the actual protocol instance.
176:             */
177:            public boolean setPropertiesInternal(Properties props) {
178:                this .props.putAll(props);
179:
180:                String str = props.getProperty("down_thread");
181:                if (str != null) {
182:                    down_thread = Boolean.valueOf(str).booleanValue();
183:                    props.remove("down_thread");
184:                }
185:
186:                str = props.getProperty("down_thread_prio");
187:                if (str != null) {
188:                    down_thread_prio = Integer.parseInt(str);
189:                    props.remove("down_thread_prio");
190:                }
191:
192:                str = props.getProperty("up_thread");
193:                if (str != null) {
194:                    up_thread = Boolean.valueOf(str).booleanValue();
195:                    props.remove("up_thread");
196:                }
197:
198:                str = props.getProperty("up_thread_prio");
199:                if (str != null) {
200:                    up_thread_prio = Integer.parseInt(str);
201:                    props.remove("up_thread_prio");
202:                }
203:
204:                str = props.getProperty("stats");
205:                if (str != null) {
206:                    stats = Boolean.valueOf(str).booleanValue();
207:                    props.remove("stats");
208:                }
209:
210:                return setProperties(props);
211:            }
212:
213:            public Properties getProperties() {
214:                return props;
215:            }
216:
217:            public boolean upThreadEnabled() {
218:                return up_thread;
219:            }
220:
221:            public boolean downThreadEnabled() {
222:                return down_thread;
223:            }
224:
225:            public boolean statsEnabled() {
226:                return stats;
227:            }
228:
229:            public void enableStats(boolean flag) {
230:                stats = flag;
231:            }
232:
233:            public void resetStats() {
234:                ;
235:            }
236:
237:            public String printStats() {
238:                return null;
239:            }
240:
241:            public Map dumpStats() {
242:                return null;
243:            }
244:
245:            public void setObserver(ProtocolObserver observer) {
246:                this .observer = observer;
247:                observer.setProtocol(this );
248:                if (up_handler != null)
249:                    up_handler.setObserver(observer);
250:                if (down_handler != null)
251:                    down_handler.setObserver(observer);
252:            }
253:
254:            /**
255:             * Called after instance has been created (null constructor) and before protocol is started.
256:             * Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
257:             * @exception Exception Thrown if protocol cannot be initialized successfully. This will cause the
258:             *                      ProtocolStack to fail, so the channel constructor will throw an exception
259:             */
260:            public void init() throws Exception {
261:            }
262:
263:            /**
264:             * This method is called on a {@link org.jgroups.Channel#connect(String)}. Starts work.
265:             * Protocols are connected and queues are ready to receive events.
266:             * Will be called <em>from bottom to top</em>. This call will replace
267:             * the <b>START</b> and <b>START_OK</b> events.
268:             * @exception Exception Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
269:             *                      to fail, so {@link org.jgroups.Channel#connect(String)} will throw an exception
270:             */
271:            public void start() throws Exception {
272:            }
273:
274:            /**
275:             * This method is called on a {@link org.jgroups.Channel#disconnect()}. Stops work (e.g. by closing multicast socket).
276:             * Will be called <em>from top to bottom</em>. This means that at the time of the method invocation the
277:             * neighbor protocol below is still working. This method will replace the
278:             * <b>STOP</b>, <b>STOP_OK</b>, <b>CLEANUP</b> and <b>CLEANUP_OK</b> events. The ProtocolStack guarantees that
279:             * when this method is called all messages in the down queue will have been flushed
280:             */
281:            public void stop() {
282:            }
283:
284:            /**
285:             * This method is called on a {@link org.jgroups.Channel#close()}.
286:             * Does some cleanup; after the call the VM will terminate
287:             */
288:            public void destroy() {
289:            }
290:
291:            public Queue getUpQueue() {
292:                return up_queue;
293:            } // used by Debugger (ProtocolView)
294:
295:            public Queue getDownQueue() {
296:                return down_queue;
297:            } // used by Debugger (ProtocolView)
298:
299:            /** List of events that are required to be answered by some layer above.
300:             @return Vector (of Integers) */
301:            public Vector requiredUpServices() {
302:                return null;
303:            }
304:
305:            /** List of events that are required to be answered by some layer below.
306:             @return Vector (of Integers) */
307:            public Vector requiredDownServices() {
308:                return null;
309:            }
310:
311:            /** List of events that are provided to layers above (they will be handled when sent down from
312:             above).
313:             @return Vector (of Integers) */
314:            public Vector providedUpServices() {
315:                return null;
316:            }
317:
318:            /** List of events that are provided to layers below (they will be handled when sent down from
319:             below).
320:             @return Vector (of Integers) */
321:            public Vector providedDownServices() {
322:                return null;
323:            }
324:
325:            public abstract String getName(); // all protocol names have to be unique !
326:
327:            public Protocol getUpProtocol() {
328:                return up_prot;
329:            }
330:
331:            public Protocol getDownProtocol() {
332:                return down_prot;
333:            }
334:
335:            public void setUpProtocol(Protocol up_prot) {
336:                this .up_prot = up_prot;
337:            }
338:
339:            public void setDownProtocol(Protocol down_prot) {
340:                this .down_prot = down_prot;
341:            }
342:
343:            public void setProtocolStack(ProtocolStack stack) {
344:                this .stack = stack;
345:            }
346:
347:            /** Used internally. If overridden, call this method first. Only creates the up_handler thread
348:             if down_thread is true */
349:            public void startUpHandler() {
350:                if (up_thread) {
351:                    if (up_handler == null) {
352:                        up_handler = new UpHandler(up_queue, this , observer);
353:                        if (up_thread_prio >= 0) {
354:                            try {
355:                                up_handler.setPriority(up_thread_prio);
356:                            } catch (Throwable t) {
357:                                if (log.isErrorEnabled())
358:                                    log
359:                                            .error(
360:                                                    "priority "
361:                                                            + up_thread_prio
362:                                                            + " could not be set for thread",
363:                                                    t);
364:                            }
365:                        }
366:                        up_handler.start();
367:                    }
368:                }
369:            }
370:
371:            /** Used internally. If overridden, call this method first. Only creates the down_handler thread
372:             if down_thread is true */
373:            public void startDownHandler() {
374:                if (down_thread) {
375:                    if (down_handler == null) {
376:                        down_handler = new DownHandler(down_queue, this ,
377:                                observer);
378:                        if (down_thread_prio >= 0) {
379:                            try {
380:                                down_handler.setPriority(down_thread_prio);
381:                            } catch (Throwable t) {
382:                                if (log.isErrorEnabled())
383:                                    log
384:                                            .error(
385:                                                    "priority "
386:                                                            + down_thread_prio
387:                                                            + " could not be set for thread",
388:                                                    t);
389:                            }
390:                        }
391:                        down_handler.start();
392:                    }
393:                }
394:            }
395:
396:            /** Used internally. If overridden, call parent's method first */
397:            public void stopInternal() {
398:                up_queue.close(false); // this should terminate up_handler thread
399:
400:                if (up_handler != null && up_handler.isAlive()) {
401:                    try {
402:                        up_handler.join(THREAD_JOIN_TIMEOUT);
403:                    } catch (Exception ex) {
404:                    }
405:                    if (up_handler != null && up_handler.isAlive()) {
406:                        up_handler.interrupt(); // still alive ? let's just kill it without mercy...
407:                        try {
408:                            up_handler.join(THREAD_JOIN_TIMEOUT);
409:                        } catch (Exception ex) {
410:                        }
411:                        if (up_handler != null && up_handler.isAlive())
412:                            if (log.isErrorEnabled())
413:                                log
414:                                        .error("up_handler thread for "
415:                                                + getName()
416:                                                + " was interrupted (in order to be terminated), but is still alive");
417:                    }
418:                }
419:                up_handler = null;
420:
421:                down_queue.close(false); // this should terminate down_handler thread
422:                if (down_handler != null && down_handler.isAlive()) {
423:                    try {
424:                        down_handler.join(THREAD_JOIN_TIMEOUT);
425:                    } catch (Exception ex) {
426:                    }
427:                    if (down_handler != null && down_handler.isAlive()) {
428:                        down_handler.interrupt(); // still alive ? let's just kill it without mercy...
429:                        try {
430:                            down_handler.join(THREAD_JOIN_TIMEOUT);
431:                        } catch (Exception ex) {
432:                        }
433:                        if (down_handler != null && down_handler.isAlive())
434:                            if (log.isErrorEnabled())
435:                                log
436:                                        .error("down_handler thread for "
437:                                                + getName()
438:                                                + " was interrupted (in order to be terminated), but is is still alive");
439:                    }
440:                }
441:                down_handler = null;
442:            }
443:
444:            /**
445:             * Internal method, should not be called by clients. Used by ProtocolStack. I would have
446:             * used the 'friends' modifier, but this is available only in C++ ... If the up_handler thread
447:             * is not available (down_thread == false), then directly call the up() method: we will run on the
448:             * caller's thread (e.g. the protocol layer below us).
449:             */
450:            protected void receiveUpEvent(Event evt) {
451:                if (up_handler == null) {
452:                    if (observer != null) { // call debugger hook (if installed)
453:                        if (observer.up(evt, up_queue.size()) == false) { // false means discard event
454:                            return;
455:                        }
456:                    }
457:                    up(evt);
458:                    return;
459:                }
460:                try {
461:                    up_queue.add(evt);
462:                } catch (Exception e) {
463:                    if (log.isWarnEnabled())
464:                        log.warn("exception: " + e);
465:                }
466:            }
467:
468:            /**
469:             * Internal method, should not be called by clients. Used by ProtocolStack. I would have
470:             * used the 'friends' modifier, but this is available only in C++ ... If the down_handler thread
471:             * is not available (down_thread == false), then directly call the down() method: we will run on the
472:             * caller's thread (e.g. the protocol layer above us).
473:             */
474:            protected void receiveDownEvent(Event evt) {
475:                if (down_handler == null) {
476:                    if (observer != null) { // call debugger hook (if installed)
477:                        if (observer.down(evt, down_queue.size()) == false) { // false means discard event
478:                            return;
479:                        }
480:                    }
481:                    int type = evt.getType();
482:                    if (type == Event.START || type == Event.STOP) {
483:                        if (handleSpecialDownEvent(evt) == false)
484:                            return;
485:                    }
486:                    down(evt);
487:                    return;
488:                }
489:                try {
490:                    down_queue.add(evt);
491:                } catch (Exception e) {
492:                    if (log.isWarnEnabled())
493:                        log.warn("exception: " + e);
494:                }
495:            }
496:
497:            /**
498:             * Causes the event to be forwarded to the next layer up in the hierarchy. Typically called
499:             * by the implementation of <code>Up</code> (when done).
500:             */
501:            public void passUp(Event evt) {
502:                if (observer != null) { // call debugger hook (if installed)
503:                    if (observer.passUp(evt) == false) { // false means don't pass up (=discard) event
504:                        return;
505:                    }
506:                }
507:                up_prot.receiveUpEvent(evt);
508:            }
509:
510:            /**
511:             * Causes the event to be forwarded to the next layer down in the hierarchy.Typically called
512:             * by the implementation of <code>Down</code> (when done).
513:             */
514:            public void passDown(Event evt) {
515:                if (observer != null) { // call debugger hook (if installed)
516:                    if (observer.passDown(evt) == false) { // false means don't pass down (=discard) event
517:                        return;
518:                    }
519:                }
520:                down_prot.receiveDownEvent(evt);
521:            }
522:
523:            /**
524:             * An event was received from the layer below. Usually the current layer will want to examine
525:             * the event type and - depending on its type - perform some computation
526:             * (e.g. removing headers from a MSG event type, or updating the internal membership list
527:             * when receiving a VIEW_CHANGE event).
528:             * Finally the event is either a) discarded, or b) an event is sent down
529:             * the stack using <code>passDown()</code> or c) the event (or another event) is sent up
530:             * the stack using <code>passUp()</code>.
531:             */
532:            public void up(Event evt) {
533:                passUp(evt);
534:            }
535:
536:            /**
537:             * An event is to be sent down the stack. The layer may want to examine its type and perform
538:             * some action on it, depending on the event's type. If the event is a message MSG, then
539:             * the layer may need to add a header to it (or do nothing at all) before sending it down
540:             * the stack using <code>passDown()</code>. In case of a GET_ADDRESS event (which tries to
541:             * retrieve the stack's address from one of the bottom layers), the layer may need to send
542:             * a new response event back up the stack using <code>passUp()</code>.
543:             */
544:            public void down(Event evt) {
545:                passDown(evt);
546:            }
547:
548:            /**  These are special internal events that should not be handled by protocols
549:             * @return boolean True: the event should be passed further down the stack. False: the event should
550:             * be discarded (not passed down the stack)
551:             */
552:            protected boolean handleSpecialDownEvent(Event evt) {
553:                switch (evt.getType()) {
554:                case Event.START:
555:                    try {
556:                        start();
557:
558:                        // if we're the transport protocol, reply with a START_OK up the stack
559:                        if (down_prot == null) {
560:                            passUp(new Event(Event.START_OK, Boolean.TRUE));
561:                            return false; // don't pass down the stack
562:                        } else
563:                            return true; // pass down the stack
564:                    } catch (Exception e) {
565:                        passUp(new Event(Event.START_OK,
566:                                new Exception("exception caused by "
567:                                        + getName() + ".start()", e)));
568:                        return false;
569:                    }
570:                case Event.STOP:
571:                    stop();
572:                    if (down_prot == null) {
573:                        passUp(new Event(Event.STOP_OK, Boolean.TRUE));
574:                        return false; // don't pass down the stack
575:                    } else
576:                        return true; // pass down the stack
577:                default:
578:                    return true; // pass down by default
579:                }
580:            }
581:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.