Source Code Cross Referenced for GroupChannel.java in  » Sevlet-Container » apache-tomcat-6.0.14 » org » apache » catalina » tribes » group » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Sevlet Container » apache tomcat 6.0.14 » org.apache.catalina.tribes.group 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Licensed to the Apache Software Foundation (ASF) under one or more
003:         * contributor license agreements.  See the NOTICE file distributed with
004:         * this work for additional information regarding copyright ownership.
005:         * The ASF licenses this file to You under the Apache License, Version 2.0
006:         * (the "License"); you may not use this file except in compliance with
007:         * the License.  You may obtain a copy of the License at
008:         *
009:         *      http://www.apache.org/licenses/LICENSE-2.0
010:         *
011:         * Unless required by applicable law or agreed to in writing, software
012:         * distributed under the License is distributed on an "AS IS" BASIS,
013:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014:         * See the License for the specific language governing permissions and
015:         * limitations under the License.
016:         */
017:        package org.apache.catalina.tribes.group;
018:
019:        import java.io.Serializable;
020:        import java.util.ArrayList;
021:        import java.util.Iterator;
022:
023:        import org.apache.catalina.tribes.ByteMessage;
024:        import org.apache.catalina.tribes.Channel;
025:        import org.apache.catalina.tribes.ChannelException;
026:        import org.apache.catalina.tribes.ChannelInterceptor;
027:        import org.apache.catalina.tribes.ChannelListener;
028:        import org.apache.catalina.tribes.ChannelMessage;
029:        import org.apache.catalina.tribes.ChannelReceiver;
030:        import org.apache.catalina.tribes.ChannelSender;
031:        import org.apache.catalina.tribes.ErrorHandler;
032:        import org.apache.catalina.tribes.ManagedChannel;
033:        import org.apache.catalina.tribes.Member;
034:        import org.apache.catalina.tribes.MembershipListener;
035:        import org.apache.catalina.tribes.MembershipService;
036:        import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
037:        import org.apache.catalina.tribes.io.ChannelData;
038:        import org.apache.catalina.tribes.io.XByteBuffer;
039:        import org.apache.catalina.tribes.UniqueId;
040:        import org.apache.catalina.tribes.Heartbeat;
041:        import org.apache.catalina.tribes.io.BufferPool;
042:        import org.apache.catalina.tribes.RemoteProcessException;
043:        import org.apache.catalina.tribes.util.Logs;
044:        import org.apache.catalina.tribes.util.Arrays;
045:
046:        /**
047:         * The default implementation of a Channel.<br>
048:         * The GroupChannel manages the replication channel. It coordinates
049:         * message being sent and received with membership announcements.
050:         * The channel has an chain of interceptors that can modify the message or perform other logic.<br>
051:         * It manages a complete group, both membership and replication.
052:         * @author Filip Hanik
053:         * @version $Revision: 500684 $, $Date: 2007-01-28 00:27:18 +0100 (dim., 28 janv. 2007) $
054:         */
055:        public class GroupChannel extends ChannelInterceptorBase implements 
056:                ManagedChannel {
057:            /**
058:             * Flag to determine if the channel manages its own heartbeat
059:             * If set to true, the channel will start a local thread for the heart beat.
060:             */
061:            protected boolean heartbeat = true;
062:            /**
063:             * If <code>heartbeat == true</code> then how often do we want this
064:             * heartbeat to run. default is one minute
065:             */
066:            protected long heartbeatSleeptime = 5 * 1000;//every 5 seconds
067:
068:            /**
069:             * Internal heartbeat thread
070:             */
071:            protected HeartbeatThread hbthread = null;
072:
073:            /**
074:             * The  <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
075:             * - MembershipService<br>
076:             * - ChannelSender <br>
077:             * - ChannelReceiver<br>
078:             */
079:            protected ChannelCoordinator coordinator = new ChannelCoordinator();
080:
081:            /**
082:             * The first interceptor in the inteceptor stack.
083:             * The interceptors are chained in a linked list, so we only need a reference to the
084:             * first one
085:             */
086:            protected ChannelInterceptor interceptors = null;
087:
088:            /**
089:             * A list of membership listeners that subscribe to membership announcements
090:             */
091:            protected ArrayList membershipListeners = new ArrayList();
092:
093:            /**
094:             * A list of channel listeners that subscribe to incoming messages
095:             */
096:            protected ArrayList channelListeners = new ArrayList();
097:
098:            /**
099:             * If set to true, the GroupChannel will check to make sure that
100:             */
101:            protected boolean optionCheck = false;
102:
103:            /**
104:             * Creates a GroupChannel. This constructor will also
105:             * add the first interceptor in the GroupChannel.<br>
106:             * The first interceptor is always the channel itself.
107:             */
108:            public GroupChannel() {
109:                addInterceptor(this );
110:            }
111:
112:            /**
113:             * Adds an interceptor to the stack for message processing<br>
114:             * Interceptors are ordered in the way they are added.<br>
115:             * <code>channel.addInterceptor(A);</code><br>
116:             * <code>channel.addInterceptor(C);</code><br>
117:             * <code>channel.addInterceptor(B);</code><br>
118:             * Will result in a interceptor stack like this:<br>
119:             * <code>A -> C -> B</code><br>
120:             * The complete stack will look like this:<br>
121:             * <code>Channel -> A -> C -> B -> ChannelCoordinator</code><br>
122:             * @param interceptor ChannelInterceptorBase
123:             */
124:            public void addInterceptor(ChannelInterceptor interceptor) {
125:                if (interceptors == null) {
126:                    interceptors = interceptor;
127:                    interceptors.setNext(coordinator);
128:                    interceptors.setPrevious(null);
129:                    coordinator.setPrevious(interceptors);
130:                } else {
131:                    ChannelInterceptor last = interceptors;
132:                    while (last.getNext() != coordinator) {
133:                        last = last.getNext();
134:                    }
135:                    last.setNext(interceptor);
136:                    interceptor.setNext(coordinator);
137:                    interceptor.setPrevious(last);
138:                    coordinator.setPrevious(interceptor);
139:                }
140:            }
141:
142:            /**
143:             * Sends a heartbeat through the interceptor stack.<br>
144:             * Invoke this method from the application on a periodic basis if
145:             * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code>
146:             */
147:            public void heartbeat() {
148:                super .heartbeat();
149:                Iterator i = membershipListeners.iterator();
150:                while (i.hasNext()) {
151:                    Object o = i.next();
152:                    if (o instanceof  Heartbeat)
153:                        ((Heartbeat) o).heartbeat();
154:                }
155:                i = channelListeners.iterator();
156:                while (i.hasNext()) {
157:                    Object o = i.next();
158:                    if (o instanceof  Heartbeat)
159:                        ((Heartbeat) o).heartbeat();
160:                }
161:
162:            }
163:
164:            /**
165:             * Send a message to the destinations specified
166:             * @param destination Member[] - destination.length > 1
167:             * @param msg Serializable - the message to send
168:             * @param options int - sender options, options can trigger guarantee levels and different interceptors to
169:             * react to the message see class documentation for the <code>Channel</code> object.<br>
170:             * @return UniqueId - the unique Id that was assigned to this message
171:             * @throws ChannelException - if an error occurs processing the message
172:             * @see org.apache.catalina.tribes.Channel
173:             */
174:            public UniqueId send(Member[] destination, Serializable msg,
175:                    int options) throws ChannelException {
176:                return send(destination, msg, options, null);
177:            }
178:
179:            /**
180:             *
181:             * @param destination Member[] - destination.length > 1
182:             * @param msg Serializable - the message to send
183:             * @param options int - sender options, options can trigger guarantee levels and different interceptors to
184:             * react to the message see class documentation for the <code>Channel</code> object.<br>
185:             * @param handler - callback object for error handling and completion notification, used when a message is
186:             * sent asynchronously using the <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag enabled.
187:             * @return UniqueId - the unique Id that was assigned to this message
188:             * @throws ChannelException - if an error occurs processing the message
189:             * @see org.apache.catalina.tribes.Channel
190:             */
191:            public UniqueId send(Member[] destination, Serializable msg,
192:                    int options, ErrorHandler handler) throws ChannelException {
193:                if (msg == null)
194:                    throw new ChannelException("Cant send a NULL message");
195:                XByteBuffer buffer = null;
196:                try {
197:                    if (destination == null || destination.length == 0)
198:                        throw new ChannelException("No destination given");
199:                    ChannelData data = new ChannelData(true);//generates a unique Id
200:                    data.setAddress(getLocalMember(false));
201:                    data.setTimestamp(System.currentTimeMillis());
202:                    byte[] b = null;
203:                    if (msg instanceof  ByteMessage) {
204:                        b = ((ByteMessage) msg).getMessage();
205:                        options = options | SEND_OPTIONS_BYTE_MESSAGE;
206:                    } else {
207:                        b = XByteBuffer.serialize(msg);
208:                        options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
209:                    }
210:                    data.setOptions(options);
211:                    //XByteBuffer buffer = new XByteBuffer(b.length+128,false);
212:                    buffer = BufferPool.getBufferPool().getBuffer(
213:                            b.length + 128, false);
214:                    buffer.append(b, 0, b.length);
215:                    data.setMessage(buffer);
216:                    InterceptorPayload payload = null;
217:                    if (handler != null) {
218:                        payload = new InterceptorPayload();
219:                        payload.setErrorHandler(handler);
220:                    }
221:                    getFirstInterceptor().sendMessage(destination, data,
222:                            payload);
223:                    if (Logs.MESSAGES.isTraceEnabled()) {
224:                        Logs.MESSAGES.trace("GroupChannel - Sent msg:"
225:                                + new UniqueId(data.getUniqueId())
226:                                + " at "
227:                                + new java.sql.Timestamp(System
228:                                        .currentTimeMillis()) + " to "
229:                                + Arrays.toNameString(destination));
230:                        Logs.MESSAGES.trace("GroupChannel - Send Message:"
231:                                + new UniqueId(data.getUniqueId()) + " is "
232:                                + msg);
233:                    }
234:
235:                    return new UniqueId(data.getUniqueId());
236:                } catch (Exception x) {
237:                    if (x instanceof  ChannelException)
238:                        throw (ChannelException) x;
239:                    throw new ChannelException(x);
240:                } finally {
241:                    if (buffer != null)
242:                        BufferPool.getBufferPool().returnBuffer(buffer);
243:                }
244:            }
245:
246:            /**
247:             * Callback from the interceptor stack. <br>
248:             * When a message is received from a remote node, this method will be invoked by
249:             * the previous interceptor.<br>
250:             * This method can also be used to send a message to other components within the same application,
251:             * but its an extreme case, and you're probably better off doing that logic between the applications itself.
252:             * @param msg ChannelMessage
253:             */
254:            public void messageReceived(ChannelMessage msg) {
255:                if (msg == null)
256:                    return;
257:                try {
258:                    if (Logs.MESSAGES.isTraceEnabled()) {
259:                        Logs.MESSAGES.trace("GroupChannel - Received msg:"
260:                                + new UniqueId(msg.getUniqueId())
261:                                + " at "
262:                                + new java.sql.Timestamp(System
263:                                        .currentTimeMillis()) + " from "
264:                                + msg.getAddress().getName());
265:                    }
266:
267:                    Serializable fwd = null;
268:                    if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE) {
269:                        fwd = new ByteMessage(msg.getMessage().getBytes());
270:                    } else {
271:                        fwd = XByteBuffer.deserialize(msg.getMessage()
272:                                .getBytesDirect(), 0, msg.getMessage()
273:                                .getLength());
274:                    }
275:                    if (Logs.MESSAGES.isTraceEnabled()) {
276:                        Logs.MESSAGES.trace("GroupChannel - Receive Message:"
277:                                + new UniqueId(msg.getUniqueId()) + " is "
278:                                + fwd);
279:                    }
280:
281:                    //get the actual member with the correct alive time
282:                    Member source = msg.getAddress();
283:                    boolean rx = false;
284:                    boolean delivered = false;
285:                    for (int i = 0; i < channelListeners.size(); i++) {
286:                        ChannelListener channelListener = (ChannelListener) channelListeners
287:                                .get(i);
288:                        if (channelListener != null
289:                                && channelListener.accept(fwd, source)) {
290:                            channelListener.messageReceived(fwd, source);
291:                            delivered = true;
292:                            //if the message was accepted by an RPC channel, that channel
293:                            //is responsible for returning the reply, otherwise we send an absence reply
294:                            if (channelListener instanceof  RpcChannel)
295:                                rx = true;
296:                        }
297:                    }//for
298:                    if ((!rx) && (fwd instanceof  RpcMessage)) {
299:                        //if we have a message that requires a response,
300:                        //but none was given, send back an immediate one
301:                        sendNoRpcChannelReply((RpcMessage) fwd, source);
302:                    }
303:                    if (Logs.MESSAGES.isTraceEnabled()) {
304:                        Logs.MESSAGES.trace("GroupChannel delivered["
305:                                + delivered + "] id:"
306:                                + new UniqueId(msg.getUniqueId()));
307:                    }
308:
309:                } catch (Exception x) {
310:                    if (log.isDebugEnabled())
311:                        log.error("Unable to process channel:IOException.", x);
312:                    throw new RemoteProcessException("IOException:"
313:                            + x.getMessage(), x);
314:                }
315:            }
316:
317:            /**
318:             * Sends a <code>NoRpcChannelReply</code> message to a member<br>
319:             * This method gets invoked by the channel if a RPC message comes in
320:             * and no channel listener accepts the message. This avoids timeout
321:             * @param msg RpcMessage
322:             * @param destination Member - the destination for the reply
323:             */
324:            protected void sendNoRpcChannelReply(RpcMessage msg,
325:                    Member destination) {
326:                try {
327:                    //avoid circular loop
328:                    if (msg instanceof  RpcMessage.NoRpcChannelReply)
329:                        return;
330:                    RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(
331:                            msg.rpcId, msg.uuid);
332:                    send(new Member[] { destination }, reply,
333:                            Channel.SEND_OPTIONS_ASYNCHRONOUS);
334:                } catch (Exception x) {
335:                    log
336:                            .error(
337:                                    "Unable to find rpc channel, failed to send NoRpcChannelReply.",
338:                                    x);
339:                }
340:            }
341:
342:            /**
343:             * memberAdded gets invoked by the interceptor below the channel
344:             * and the channel will broadcast it to the membership listeners
345:             * @param member Member - the new member
346:             */
347:            public void memberAdded(Member member) {
348:                //notify upwards
349:                for (int i = 0; i < membershipListeners.size(); i++) {
350:                    MembershipListener membershipListener = (MembershipListener) membershipListeners
351:                            .get(i);
352:                    if (membershipListener != null)
353:                        membershipListener.memberAdded(member);
354:                }
355:            }
356:
357:            /**
358:             * memberDisappeared gets invoked by the interceptor below the channel
359:             * and the channel will broadcast it to the membership listeners
360:             * @param member Member - the member that left or crashed
361:             */
362:            public void memberDisappeared(Member member) {
363:                //notify upwards
364:                for (int i = 0; i < membershipListeners.size(); i++) {
365:                    MembershipListener membershipListener = (MembershipListener) membershipListeners
366:                            .get(i);
367:                    if (membershipListener != null)
368:                        membershipListener.memberDisappeared(member);
369:                }
370:            }
371:
372:            /**
373:             * Sets up the default implementation interceptor stack
374:             * if no interceptors have been added
375:             * @throws ChannelException
376:             */
377:            protected synchronized void setupDefaultStack()
378:                    throws ChannelException {
379:
380:                if (getFirstInterceptor() != null
381:                        && ((getFirstInterceptor().getNext() instanceof  ChannelCoordinator))) {
382:                    ChannelInterceptor interceptor = null;
383:                    Class clazz = null;
384:                    try {
385:                        clazz = Class
386:                                .forName(
387:                                        "org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor",
388:                                        true, GroupChannel.class
389:                                                .getClassLoader());
390:                        clazz.newInstance();
391:                    } catch (Throwable x) {
392:                        clazz = MessageDispatchInterceptor.class;
393:                    }//catch
394:                    try {
395:                        interceptor = (ChannelInterceptor) clazz.newInstance();
396:                    } catch (Exception x) {
397:                        throw new ChannelException(
398:                                "Unable to add MessageDispatchInterceptor to interceptor chain.",
399:                                x);
400:                    }
401:                    this .addInterceptor(interceptor);
402:                }
403:            }
404:
405:            /**
406:             * Validates the option flags that each interceptor is using and reports
407:             * an error if two interceptor share the same flag.
408:             * @throws ChannelException
409:             */
410:            protected void checkOptionFlags() throws ChannelException {
411:                StringBuffer conflicts = new StringBuffer();
412:                ChannelInterceptor first = interceptors;
413:                while (first != null) {
414:                    int flag = first.getOptionFlag();
415:                    if (flag != 0) {
416:                        ChannelInterceptor next = first.getNext();
417:                        while (next != null) {
418:                            int nflag = next.getOptionFlag();
419:                            if (nflag != 0
420:                                    && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) {
421:                                conflicts.append("[");
422:                                conflicts.append(first.getClass().getName());
423:                                conflicts.append(":");
424:                                conflicts.append(flag);
425:                                conflicts.append(" == ");
426:                                conflicts.append(next.getClass().getName());
427:                                conflicts.append(":");
428:                                conflicts.append(nflag);
429:                                conflicts.append("] ");
430:                            }//end if
431:                            next = next.getNext();
432:                        }//while
433:                    }//end if
434:                    first = first.getNext();
435:                }//while
436:                if (conflicts.length() > 0)
437:                    throw new ChannelException(
438:                            "Interceptor option flag conflict: "
439:                                    + conflicts.toString());
440:
441:            }
442:
443:            /**
444:             * Starts the channel
445:             * @param svc int - what service to start
446:             * @throws ChannelException
447:             * @see org.apache.catalina.tribes.Channel#start(int)
448:             */
449:            public synchronized void start(int svc) throws ChannelException {
450:                setupDefaultStack();
451:                if (optionCheck)
452:                    checkOptionFlags();
453:                super .start(svc);
454:                if (hbthread == null && heartbeat) {
455:                    hbthread = new HeartbeatThread(this , heartbeatSleeptime);
456:                    hbthread.start();
457:                }
458:            }
459:
460:            /**
461:             * Stops the channel
462:             * @param svc int
463:             * @throws ChannelException
464:             * @see org.apache.catalina.tribes.Channel#stop(int)
465:             */
466:            public synchronized void stop(int svc) throws ChannelException {
467:                if (hbthread != null) {
468:                    hbthread.stopHeartbeat();
469:                    hbthread = null;
470:                }
471:                super .stop(svc);
472:            }
473:
474:            /**
475:             * Returns the first interceptor of the stack. Useful for traversal.
476:             * @return ChannelInterceptor
477:             */
478:            public ChannelInterceptor getFirstInterceptor() {
479:                if (interceptors != null)
480:                    return interceptors;
481:                else
482:                    return coordinator;
483:            }
484:
485:            /**
486:             * Returns the channel receiver component
487:             * @return ChannelReceiver
488:             */
489:            public ChannelReceiver getChannelReceiver() {
490:                return coordinator.getClusterReceiver();
491:            }
492:
493:            /**
494:             * Returns the channel sender component
495:             * @return ChannelSender
496:             */
497:            public ChannelSender getChannelSender() {
498:                return coordinator.getClusterSender();
499:            }
500:
501:            /**
502:             * Returns the membership service component
503:             * @return MembershipService
504:             */
505:            public MembershipService getMembershipService() {
506:                return coordinator.getMembershipService();
507:            }
508:
509:            /**
510:             * Sets the channel receiver component
511:             * @param clusterReceiver ChannelReceiver
512:             */
513:            public void setChannelReceiver(ChannelReceiver clusterReceiver) {
514:                coordinator.setClusterReceiver(clusterReceiver);
515:            }
516:
517:            /**
518:             * Sets the channel sender component
519:             * @param clusterSender ChannelSender
520:             */
521:            public void setChannelSender(ChannelSender clusterSender) {
522:                coordinator.setClusterSender(clusterSender);
523:            }
524:
525:            /**
526:             * Sets the membership component
527:             * @param membershipService MembershipService
528:             */
529:            public void setMembershipService(MembershipService membershipService) {
530:                coordinator.setMembershipService(membershipService);
531:            }
532:
533:            /**
534:             * Adds a membership listener to the channel.<br>
535:             * Membership listeners are uniquely identified using the equals(Object) method
536:             * @param membershipListener MembershipListener
537:             */
538:            public void addMembershipListener(
539:                    MembershipListener membershipListener) {
540:                if (!this .membershipListeners.contains(membershipListener))
541:                    this .membershipListeners.add(membershipListener);
542:            }
543:
544:            /**
545:             * Removes a membership listener from the channel.<br>
546:             * Membership listeners are uniquely identified using the equals(Object) method
547:             * @param membershipListener MembershipListener
548:             */
549:
550:            public void removeMembershipListener(
551:                    MembershipListener membershipListener) {
552:                membershipListeners.remove(membershipListener);
553:            }
554:
555:            /**
556:             * Adds a channel listener to the channel.<br>
557:             * Channel listeners are uniquely identified using the equals(Object) method
558:             * @param channelListener ChannelListener
559:             */
560:            public void addChannelListener(ChannelListener channelListener) {
561:                if (!this .channelListeners.contains(channelListener)) {
562:                    this .channelListeners.add(channelListener);
563:                } else {
564:                    throw new IllegalArgumentException(
565:                            "Listener already exists:" + channelListener + "["
566:                                    + channelListener.getClass().getName()
567:                                    + "]");
568:                }
569:            }
570:
571:            /**
572:             *
573:             * Removes a channel listener from the channel.<br>
574:             * Channel listeners are uniquely identified using the equals(Object) method
575:             * @param channelListener ChannelListener
576:             */
577:            public void removeChannelListener(ChannelListener channelListener) {
578:                channelListeners.remove(channelListener);
579:            }
580:
581:            /**
582:             * Returns an iterator of all the interceptors in this stack
583:             * @return Iterator
584:             */
585:            public Iterator getInterceptors() {
586:                return new InterceptorIterator(this .getNext(), this .coordinator);
587:            }
588:
589:            /**
590:             * Enables/disables the option check<br>
591:             * Setting this to true, will make the GroupChannel perform a conflict check
592:             * on the interceptors. If two interceptors are using the same option flag
593:             * and throw an error upon start.
594:             * @param optionCheck boolean
595:             */
596:            public void setOptionCheck(boolean optionCheck) {
597:                this .optionCheck = optionCheck;
598:            }
599:
600:            /**
601:             * Configure local heartbeat sleep time<br>
602:             * Only used when <code>getHeartbeat()==true</code>
603:             * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
604:             */
605:            public void setHeartbeatSleeptime(long heartbeatSleeptime) {
606:                this .heartbeatSleeptime = heartbeatSleeptime;
607:            }
608:
609:            /**
610:             * Enables or disables local heartbeat.
611:             * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal
612:             * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
613:             * @param heartbeat boolean
614:             */
615:            public void setHeartbeat(boolean heartbeat) {
616:                this .heartbeat = heartbeat;
617:            }
618:
619:            /**
620:             * @see #setOptionCheck(boolean)
621:             * @return boolean
622:             */
623:            public boolean getOptionCheck() {
624:                return optionCheck;
625:            }
626:
627:            /**
628:             * @see #setHeartbeat(boolean)
629:             * @return boolean
630:             */
631:            public boolean getHeartbeat() {
632:                return heartbeat;
633:            }
634:
635:            /**
636:             * Returns the sleep time in milliseconds that the internal heartbeat will
637:             * sleep in between invokations of <code>Channel.heartbeat()</code>
638:             * @return long
639:             */
640:            public long getHeartbeatSleeptime() {
641:                return heartbeatSleeptime;
642:            }
643:
644:            /**
645:             *
646:             * <p>Title: Interceptor Iterator</p>
647:             *
648:             * <p>Description: An iterator to loop through the interceptors in a channel</p>
649:             *
650:             * @version 1.0
651:             */
652:            public static class InterceptorIterator implements  Iterator {
653:                private ChannelInterceptor end;
654:                private ChannelInterceptor start;
655:
656:                public InterceptorIterator(ChannelInterceptor start,
657:                        ChannelInterceptor end) {
658:                    this .end = end;
659:                    this .start = start;
660:                }
661:
662:                public boolean hasNext() {
663:                    return start != null && start != end;
664:                }
665:
666:                public Object next() {
667:                    Object result = null;
668:                    if (hasNext()) {
669:                        result = start;
670:                        start = start.getNext();
671:                    }
672:                    return result;
673:                }
674:
675:                public void remove() {
676:                    //empty operation
677:                }
678:            }
679:
680:            /**
681:             *
682:             * <p>Title: Internal heartbeat thread</p>
683:             *
684:             * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class
685:             * is created</p>
686:             *
687:             * @version 1.0
688:             */
689:            public static class HeartbeatThread extends Thread {
690:                protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
691:                        .getLog(HeartbeatThread.class);
692:                protected static int counter = 1;
693:
694:                protected static synchronized int inc() {
695:                    return counter++;
696:                }
697:
698:                protected boolean doRun = true;
699:                protected GroupChannel channel;
700:                protected long sleepTime;
701:
702:                public HeartbeatThread(GroupChannel channel, long sleepTime) {
703:                    super ();
704:                    this .setPriority(MIN_PRIORITY);
705:                    setName("GroupChannel-Heartbeat-" + inc());
706:                    setDaemon(true);
707:                    this .channel = channel;
708:                    this .sleepTime = sleepTime;
709:                }
710:
711:                public void stopHeartbeat() {
712:                    doRun = false;
713:                    interrupt();
714:                }
715:
716:                public void run() {
717:                    while (doRun) {
718:                        try {
719:                            Thread.sleep(sleepTime);
720:                            channel.heartbeat();
721:                        } catch (InterruptedException x) {
722:                            interrupted();
723:                        } catch (Exception x) {
724:                            log
725:                                    .error(
726:                                            "Unable to send heartbeat through Tribes interceptor stack. Will try to sleep again.",
727:                                            x);
728:                        }//catch
729:                    }//while
730:                }//run
731:            }//HeartbeatThread
732:
733:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.