Source Code Cross Referenced for MessageDispatcher.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.blocks;
002:
003:        import org.apache.commons.logging.Log;
004:        import org.apache.commons.logging.LogFactory;
005:        import org.jgroups.*;
006:        import org.jgroups.stack.Protocol;
007:        import org.jgroups.stack.StateTransferInfo;
008:        import org.jgroups.util.*;
009:
010:        import java.io.InputStream;
011:        import java.io.OutputStream;
012:        import java.io.Serializable;
013:        import java.util.Vector;
014:        import java.util.Collection;
015:        import java.util.TreeSet;
016:        import java.util.ArrayList;
017:
018:        /**
019:         * Provides synchronous and asynchronous message sending with request-response
020:         * correlation; i.e., matching responses with the original request.
021:         * It also offers push-style message reception (by internally using the PullPushAdapter).
022:         * <p>
023:         * Channels are simple patterns to asynchronously send a receive messages.
024:         * However, a significant number of communication patterns in group communication
025:         * require synchronous communication. For example, a sender would like to send a
026:         * message to the group and wait for all responses. Or another application would
027:         * like to send a message to the group and wait only until the majority of the
028:         * receivers have sent a response, or until a timeout occurred.  MessageDispatcher
029:         * offers a combination of the above pattern with other patterns.
030:         * <p>
031:         * Used on top of channel to implement group requests. Client's <code>handle()</code>
032:         * method is called when request is received. Is the equivalent of RpcProtocol on
033:         * the application instead of protocol level.
034:         *
035:         * @author Bela Ban
036:         * @version $Id: MessageDispatcher.java,v 1.60.2.3 2007/03/08 10:14:45 belaban Exp $
037:         */
038:        public class MessageDispatcher implements  RequestHandler {
039:            protected Channel channel = null;
040:            protected RequestCorrelator corr = null;
041:            protected MessageListener msg_listener = null;
042:            protected MembershipListener membership_listener = null;
043:            protected RequestHandler req_handler = null;
044:            protected ProtocolAdapter prot_adapter = null;
045:            protected TransportAdapter transport_adapter = null;
046:            protected final Collection members = new TreeSet();
047:            protected Address local_addr = null;
048:            protected boolean deadlock_detection = false;
049:            protected PullPushAdapter adapter = null;
050:            protected PullPushHandler handler = null;
051:            protected Serializable id = null;
052:            protected final Log log = LogFactory.getLog(getClass());
053:
054:            /**
055:             * Process items on the queue concurrently (RequestCorrelator). The default is to wait until the processing of an
056:             * item has completed before fetching the next item from the queue. Note that setting this to true may destroy the
057:             * properties of a protocol stack, e.g total or causal order may not be guaranteed. Set this to true only if you
058:             * know what you're doing !
059:             */
060:            protected boolean concurrent_processing = false;
061:
062:            public MessageDispatcher(Channel channel, MessageListener l,
063:                    MembershipListener l2) {
064:                this .channel = channel;
065:                prot_adapter = new ProtocolAdapter();
066:                if (channel != null) {
067:                    local_addr = channel.getLocalAddress();
068:                }
069:                setMessageListener(l);
070:                setMembershipListener(l2);
071:                if (channel != null) {
072:                    channel.setUpHandler(prot_adapter);
073:                }
074:                start();
075:            }
076:
077:            public MessageDispatcher(Channel channel, MessageListener l,
078:                    MembershipListener l2, boolean deadlock_detection) {
079:                this .channel = channel;
080:                this .deadlock_detection = deadlock_detection;
081:                prot_adapter = new ProtocolAdapter();
082:                if (channel != null) {
083:                    local_addr = channel.getLocalAddress();
084:                }
085:                setMessageListener(l);
086:                setMembershipListener(l2);
087:                if (channel != null) {
088:                    channel.setUpHandler(prot_adapter);
089:                }
090:                start();
091:            }
092:
093:            public MessageDispatcher(Channel channel, MessageListener l,
094:                    MembershipListener l2, boolean deadlock_detection,
095:                    boolean concurrent_processing) {
096:                this .channel = channel;
097:                this .deadlock_detection = deadlock_detection;
098:                this .concurrent_processing = concurrent_processing;
099:                prot_adapter = new ProtocolAdapter();
100:                if (channel != null) {
101:                    local_addr = channel.getLocalAddress();
102:                }
103:                setMessageListener(l);
104:                setMembershipListener(l2);
105:                if (channel != null) {
106:                    channel.setUpHandler(prot_adapter);
107:                }
108:                start();
109:            }
110:
111:            public MessageDispatcher(Channel channel, MessageListener l,
112:                    MembershipListener l2, RequestHandler req_handler) {
113:                this (channel, l, l2);
114:                setRequestHandler(req_handler);
115:            }
116:
117:            public MessageDispatcher(Channel channel, MessageListener l,
118:                    MembershipListener l2, RequestHandler req_handler,
119:                    boolean deadlock_detection) {
120:                this (channel, l, l2, deadlock_detection, false);
121:                setRequestHandler(req_handler);
122:            }
123:
124:            public MessageDispatcher(Channel channel, MessageListener l,
125:                    MembershipListener l2, RequestHandler req_handler,
126:                    boolean deadlock_detection, boolean concurrent_processing) {
127:                this (channel, l, l2, deadlock_detection, concurrent_processing);
128:                setRequestHandler(req_handler);
129:            }
130:
131:            /*
132:             * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
133:             * used to register under that id. This is typically used when another building block is already using
134:             * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
135:             * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
136:             * first block created on PullPushAdapter.
137:             * @param adapter The PullPushAdapter which to use as underlying transport
138:             * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
139:             *           requests/responses for different building blocks on top of PullPushAdapter.
140:             */
141:            public MessageDispatcher(PullPushAdapter adapter, Serializable id,
142:                    MessageListener l, MembershipListener l2) {
143:                this .adapter = adapter;
144:                this .id = id;
145:                setMembers(((Channel) adapter.getTransport()).getView()
146:                        .getMembers());
147:                setMessageListener(l);
148:                setMembershipListener(l2);
149:                handler = new PullPushHandler();
150:                transport_adapter = new TransportAdapter();
151:                adapter.addMembershipListener(handler); // remove in stop()
152:                if (id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter
153:                    adapter.setListener(handler);
154:                } else {
155:                    adapter.registerListener(id, handler);
156:                }
157:
158:                Transport tp;
159:                if ((tp = adapter.getTransport()) instanceof  Channel) {
160:                    local_addr = ((Channel) tp).getLocalAddress();
161:                }
162:                start();
163:            }
164:
165:            /*
166:             * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
167:             * used to register under that id. This is typically used when another building block is already using
168:             * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
169:             * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
170:             * first block created on PullPushAdapter.
171:             * @param adapter The PullPushAdapter which to use as underlying transport
172:             * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
173:             *           requests/responses for different building blocks on top of PullPushAdapter.
174:             * @param req_handler The object implementing RequestHandler. It will be called when a request is received
175:             */
176:            public MessageDispatcher(PullPushAdapter adapter, Serializable id,
177:                    MessageListener l, MembershipListener l2,
178:                    RequestHandler req_handler) {
179:                this .adapter = adapter;
180:                this .id = id;
181:                setMembers(((Channel) adapter.getTransport()).getView()
182:                        .getMembers());
183:                setRequestHandler(req_handler);
184:                setMessageListener(l);
185:                setMembershipListener(l2);
186:                handler = new PullPushHandler();
187:                transport_adapter = new TransportAdapter();
188:                adapter.addMembershipListener(handler);
189:                if (id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter
190:                    adapter.setListener(handler);
191:                } else {
192:                    adapter.registerListener(id, handler);
193:                }
194:
195:                Transport tp;
196:                if ((tp = adapter.getTransport()) instanceof  Channel) {
197:                    local_addr = ((Channel) tp).getLocalAddress(); // fixed bug #800774
198:                }
199:
200:                start();
201:            }
202:
203:            public MessageDispatcher(PullPushAdapter adapter, Serializable id,
204:                    MessageListener l, MembershipListener l2,
205:                    RequestHandler req_handler, boolean concurrent_processing) {
206:                this .concurrent_processing = concurrent_processing;
207:                this .adapter = adapter;
208:                this .id = id;
209:                setMembers(((Channel) adapter.getTransport()).getView()
210:                        .getMembers());
211:                setRequestHandler(req_handler);
212:                setMessageListener(l);
213:                setMembershipListener(l2);
214:                handler = new PullPushHandler();
215:                transport_adapter = new TransportAdapter();
216:                adapter.addMembershipListener(handler);
217:                if (id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter
218:                    adapter.setListener(handler);
219:                } else {
220:                    adapter.registerListener(id, handler);
221:                }
222:
223:                Transport tp;
224:                if ((tp = adapter.getTransport()) instanceof  Channel) {
225:                    local_addr = ((Channel) tp).getLocalAddress(); // fixed bug #800774
226:                }
227:
228:                start();
229:            }
230:
231:            /** Returns a copy of members */
232:            protected Collection getMembers() {
233:                synchronized (members) {
234:                    return new ArrayList(members);
235:                }
236:            }
237:
238:            /**
239:             * If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter
240:             * initially since viewChange has most likely already been called in PullPushAdapter.
241:             */
242:            private void setMembers(Vector new_mbrs) {
243:                if (new_mbrs != null) {
244:                    synchronized (members) {
245:                        members.clear();
246:                        members.addAll(new_mbrs);
247:                    }
248:                }
249:            }
250:
251:            public void setDeadlockDetection(boolean flag) {
252:                deadlock_detection = flag;
253:                if (corr != null)
254:                    corr.setDeadlockDetection(flag);
255:            }
256:
257:            public void setConcurrentProcessing(boolean flag) {
258:                this .concurrent_processing = flag;
259:                if (corr != null)
260:                    corr.setConcurrentProcessing(flag);
261:            }
262:
263:            public final void start() {
264:                if (corr == null) {
265:                    if (transport_adapter != null) {
266:                        corr = new RequestCorrelator("MessageDispatcher",
267:                                transport_adapter, this , deadlock_detection,
268:                                local_addr, concurrent_processing);
269:                    } else {
270:                        corr = new RequestCorrelator("MessageDispatcher",
271:                                prot_adapter, this , deadlock_detection,
272:                                local_addr, concurrent_processing);
273:                    }
274:                }
275:                correlatorStarted();
276:                corr.start();
277:                if (channel != null) {
278:                    Vector tmp_mbrs = channel.getView() != null ? channel
279:                            .getView().getMembers() : null;
280:                    setMembers(tmp_mbrs);
281:                }
282:            }
283:
284:            protected void correlatorStarted() {
285:                ;
286:            }
287:
288:            public void stop() {
289:                if (corr != null) {
290:                    corr.stop();
291:                }
292:
293:                // fixes leaks of MembershipListeners (http://jira.jboss.com/jira/browse/JGRP-160)
294:                if (adapter != null && handler != null) {
295:                    adapter.removeMembershipListener(handler);
296:                }
297:            }
298:
299:            public final void setMessageListener(MessageListener l) {
300:                msg_listener = l;
301:            }
302:
303:            /**
304:             * Gives access to the currently configured MessageListener. Returns null if there is no
305:             * configured MessageListener.
306:             */
307:            public MessageListener getMessageListener() {
308:                return msg_listener;
309:            }
310:
311:            public final void setMembershipListener(MembershipListener l) {
312:                membership_listener = l;
313:            }
314:
315:            public final void setRequestHandler(RequestHandler rh) {
316:                req_handler = rh;
317:            }
318:
319:            /**
320:             * Offers access to the underlying Channel.
321:             * @return a reference to the underlying Channel.
322:             */
323:            public Channel getChannel() {
324:                return channel;
325:            }
326:
327:            public void send(Message msg) throws ChannelNotConnectedException,
328:                    ChannelClosedException {
329:                if (channel != null) {
330:                    channel.send(msg);
331:                } else if (adapter != null) {
332:                    try {
333:                        if (id != null) {
334:                            adapter.send(id, msg);
335:                        } else {
336:                            adapter.send(msg);
337:                        }
338:                    } catch (Throwable ex) {
339:                        if (log.isErrorEnabled()) {
340:                            log.error("exception=" + Util.print(ex));
341:                        }
342:                    }
343:                } else {
344:                    if (log.isErrorEnabled()) {
345:                        log.error("channel == null");
346:                    }
347:                }
348:            }
349:
350:            public RspList castMessage(final Vector dests, Message msg,
351:                    int mode, long timeout) {
352:                return castMessage(dests, msg, mode, timeout, false);
353:            }
354:
355:            /**
356:             * Cast a message to all members, and wait for <code>mode</code> responses. The responses are returned in a response
357:             * list, where each response is associated with its sender.<p> Uses <code>GroupRequest</code>.
358:             *
359:             * @param dests   The members to which the message is to be sent. If it is null, then the message is sent to all
360:             *                members
361:             * @param msg     The message to be sent to n members
362:             * @param mode    Defined in <code>GroupRequest</code>. The number of responses to wait for: <ol> <li>GET_FIRST:
363:             *                return the first response received. <li>GET_ALL: wait for all responses (minus the ones from
364:             *                suspected members) <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp
365:             *                size) <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once) <li>GET_N: wait for n
366:             *                responses (may block if n > group size) <li>GET_NONE: wait for no responses, return immediately
367:             *                (non-blocking) </ol>
368:             * @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses <em>or</em> timeout time.
369:             * @return RspList A list of responses. Each response is an <code>Object</code> and associated to its sender.
370:             */
371:            public RspList castMessage(final Vector dests, Message msg,
372:                    int mode, long timeout, boolean use_anycasting) {
373:                GroupRequest _req = null;
374:                Vector real_dests;
375:                Channel tmp;
376:
377:                // we need to clone because we don't want to modify the original
378:                // (we remove ourselves if LOCAL is false, see below) !
379:                // real_dests=dests != null ? (Vector) dests.clone() : (members != null ? new Vector(members) : null);
380:                if (dests != null) {
381:                    real_dests = (Vector) dests.clone();
382:                } else {
383:                    synchronized (members) {
384:                        real_dests = new Vector(members);
385:                    }
386:                }
387:
388:                // if local delivery is off, then we should not wait for the message from the local member.
389:                // therefore remove it from the membership
390:                tmp = channel;
391:                if (tmp == null) {
392:                    if (adapter != null
393:                            && adapter.getTransport() instanceof  Channel) {
394:                        tmp = (Channel) adapter.getTransport();
395:                    }
396:                }
397:
398:                if (tmp != null
399:                        && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
400:                    if (local_addr == null) {
401:                        local_addr = tmp.getLocalAddress();
402:                    }
403:                    if (local_addr != null && real_dests != null) {
404:                        real_dests.removeElement(local_addr);
405:                    }
406:                }
407:
408:                // don't even send the message if the destination list is empty
409:                if (log.isTraceEnabled())
410:                    log.trace("real_dests=" + real_dests);
411:
412:                if (real_dests == null || real_dests.size() == 0) {
413:                    if (log.isTraceEnabled())
414:                        log
415:                                .trace("destination list is empty, won't send message");
416:                    return new RspList(); // return empty response list
417:                }
418:
419:                _req = new GroupRequest(msg, corr, real_dests, mode, timeout, 0);
420:                _req.setCaller(this .local_addr);
421:                try {
422:                    _req.execute(use_anycasting);
423:                } catch (Exception ex) {
424:                    throw new RuntimeException("failed executing request "
425:                            + _req, ex);
426:                }
427:
428:                return _req.getResults();
429:            }
430:
431:            /**
432:             * Multicast a message request to all members in <code>dests</code> and receive responses via the RspCollector
433:             * interface. When done receiving the required number of responses, the caller has to call done(req_id) on the
434:             * underlyinh RequestCorrelator, so that the resources allocated to that request can be freed.
435:             *
436:             * @param dests  The list of members from which to receive responses. Null means all members
437:             * @param req_id The ID of the request. Used by the underlying RequestCorrelator to correlate responses with
438:             *               requests
439:             * @param msg    The request to be sent
440:             * @param coll   The sender needs to provide this interface to collect responses. Call will return immediately if
441:             *               this is null
442:             */
443:            public void castMessage(final Vector dests, long req_id,
444:                    Message msg, RspCollector coll) {
445:                Vector real_dests;
446:                Channel tmp;
447:
448:                if (msg == null) {
449:                    if (log.isErrorEnabled())
450:                        log.error("request is null");
451:                    return;
452:                }
453:
454:                if (coll == null) {
455:                    if (log.isErrorEnabled())
456:                        log
457:                                .error("response collector is null (must be non-null)");
458:                    return;
459:                }
460:
461:                // we need to clone because we don't want to modify the original
462:                // (we remove ourselves if LOCAL is false, see below) !
463:                //real_dests=dests != null ? (Vector) dests.clone() : (Vector) members.clone();
464:                if (dests != null) {
465:                    real_dests = (Vector) dests.clone();
466:                } else {
467:                    synchronized (members) {
468:                        real_dests = new Vector(members);
469:                    }
470:                }
471:
472:                // if local delivery is off, then we should not wait for the message from the local member.
473:                // therefore remove it from the membership
474:                tmp = channel;
475:                if (tmp == null) {
476:                    if (adapter != null
477:                            && adapter.getTransport() instanceof  Channel) {
478:                        tmp = (Channel) adapter.getTransport();
479:                    }
480:                }
481:
482:                if (tmp != null
483:                        && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
484:                    if (local_addr == null) {
485:                        local_addr = tmp.getLocalAddress();
486:                    }
487:                    if (local_addr != null) {
488:                        real_dests.removeElement(local_addr);
489:                    }
490:                }
491:
492:                // don't even send the message if the destination list is empty
493:                if (real_dests.size() == 0) {
494:                    if (log.isDebugEnabled())
495:                        log
496:                                .debug("destination list is empty, won't send message");
497:                    return;
498:                }
499:
500:                try {
501:                    corr.sendRequest(req_id, real_dests, msg, coll);
502:                } catch (Exception e) {
503:                    throw new RuntimeException("failure sending request "
504:                            + req_id + " to " + real_dests, e);
505:                }
506:            }
507:
508:            public void done(long req_id) {
509:                corr.done(req_id);
510:            }
511:
512:            /**
513:             * Sends a message to a single member (destination = msg.dest) and returns the response. The message's destination
514:             * must be non-zero !
515:             */
516:            public Object sendMessage(Message msg, int mode, long timeout)
517:                    throws TimeoutException, SuspectedException {
518:                Vector mbrs = new Vector();
519:                RspList rsp_list = null;
520:                Object dest = msg.getDest();
521:                Rsp rsp;
522:                GroupRequest _req = null;
523:
524:                if (dest == null) {
525:                    if (log.isErrorEnabled())
526:                        log
527:                                .error("the message's destination is null, cannot send message");
528:                    return null;
529:                }
530:
531:                mbrs.addElement(dest); // dummy membership (of destination address)
532:
533:                _req = new GroupRequest(msg, corr, mbrs, mode, timeout, 0);
534:                _req.setCaller(local_addr);
535:                try {
536:                    _req.execute();
537:                } catch (Exception t) {
538:                    throw new RuntimeException("failed executing request "
539:                            + _req, t);
540:                }
541:
542:                if (mode == GroupRequest.GET_NONE) {
543:                    return null;
544:                }
545:
546:                rsp_list = _req.getResults();
547:
548:                if (rsp_list.size() == 0) {
549:                    if (log.isWarnEnabled())
550:                        log.warn(" response list is empty");
551:                    return null;
552:                }
553:                if (rsp_list.size() > 1) {
554:                    if (log.isWarnEnabled())
555:                        log
556:                                .warn("response list contains more that 1 response; returning first response !");
557:                }
558:                rsp = (Rsp) rsp_list.elementAt(0);
559:                if (rsp.wasSuspected()) {
560:                    throw new SuspectedException(dest);
561:                }
562:                if (!rsp.wasReceived()) {
563:                    throw new TimeoutException();
564:                }
565:                return rsp.getValue();
566:            }
567:
568:            //    public void channelConnected(Channel channel) {
569:            //        if(channel != null) {
570:            //            Address new_local_addr=channel.getLocalAddress();
571:            //            if(new_local_addr != null) {
572:            //                this.local_addr=new_local_addr;
573:            //
574:            //                    if(log.isInfoEnabled()) log.info("MessageDispatcher.channelConnected()", "new local address is " + this.local_addr);
575:            //            }
576:            //        }
577:            //    }
578:            //
579:            //    public void channelDisconnected(Channel channel) {
580:            //    }
581:            //
582:            //    public void channelClosed(Channel channel) {
583:            //    }
584:            //
585:            //    public void channelShunned() {
586:            //    }
587:            //
588:            //    public void channelReconnected(Address addr) {
589:            //        if(channel != null) {
590:            //            Address new_local_addr=channel.getLocalAddress();
591:            //            if(new_local_addr != null) {
592:            //                this.local_addr=new_local_addr;
593:            //
594:            //                    if(log.isInfoEnabled()) log.info("MessageDispatcher.channelReconnected()", "new local address is " + this.local_addr);
595:            //            }
596:            //        }
597:            //    }
598:
599:            /* ------------------------ RequestHandler Interface ---------------------- */
600:            public Object handle(Message msg) {
601:                if (req_handler != null) {
602:                    return req_handler.handle(msg);
603:                } else {
604:                    return null;
605:                }
606:            }
607:
608:            /* -------------------- End of RequestHandler Interface ------------------- */
609:
610:            class ProtocolAdapter extends Protocol implements  UpHandler {
611:
612:                /* ------------------------- Protocol Interface --------------------------- */
613:
614:                public String getName() {
615:                    return "MessageDispatcher";
616:                }
617:
618:                public void startUpHandler() {
619:                    // do nothing, DON'T REMOVE !!!!
620:                }
621:
622:                public void startDownHandler() {
623:                    // do nothing, DON'T REMOVE !!!!
624:                }
625:
626:                public void stopInternal() {
627:                    // do nothing, DON'T REMOVE !!!!
628:                }
629:
630:                protected void receiveUpEvent(Event evt) {
631:                }
632:
633:                protected void receiveDownEvent(Event evt) {
634:                }
635:
636:                /**
637:                 * Called by request correlator when message was not generated by it. We handle it and call the message
638:                 * listener's corresponding methods
639:                 */
640:                public void passUp(Event evt) {
641:                    switch (evt.getType()) {
642:                    case Event.MSG:
643:                        if (msg_listener != null) {
644:                            msg_listener.receive((Message) evt.getArg());
645:                        }
646:                        break;
647:
648:                    case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
649:                        StateTransferInfo info = (StateTransferInfo) evt
650:                                .getArg();
651:                        String state_id = info.state_id;
652:                        byte[] tmp_state = null;
653:                        if (msg_listener != null) {
654:                            try {
655:                                if (msg_listener instanceof  ExtendedMessageListener
656:                                        && state_id != null) {
657:                                    tmp_state = ((ExtendedMessageListener) msg_listener)
658:                                            .getState(state_id);
659:                                } else {
660:                                    tmp_state = msg_listener.getState();
661:                                }
662:                            } catch (Throwable t) {
663:                                this .log.error(
664:                                        "failed getting state from message listener ("
665:                                                + msg_listener + ')', t);
666:                            }
667:                        }
668:                        channel.returnState(tmp_state, state_id);
669:                        break;
670:
671:                    case Event.GET_STATE_OK:
672:                        if (msg_listener != null) {
673:                            try {
674:                                info = (StateTransferInfo) evt.getArg();
675:                                String id = info.state_id;
676:                                if (msg_listener instanceof  ExtendedMessageListener
677:                                        && id != null) {
678:                                    ((ExtendedMessageListener) msg_listener)
679:                                            .setState(id, info.state);
680:                                } else {
681:                                    msg_listener.setState(info.state);
682:                                }
683:                            } catch (ClassCastException cast_ex) {
684:                                if (this .log.isErrorEnabled())
685:                                    this .log
686:                                            .error("received SetStateEvent, but argument "
687:                                                    + evt.getArg()
688:                                                    + " is not serializable. Discarding message.");
689:                            }
690:                        }
691:                        break;
692:
693:                    case Event.STATE_TRANSFER_OUTPUTSTREAM:
694:                        if (msg_listener != null) {
695:                            StateTransferInfo sti = (StateTransferInfo) evt
696:                                    .getArg();
697:                            OutputStream os = sti.outputStream;
698:                            if (os != null
699:                                    && msg_listener instanceof  ExtendedMessageListener) {
700:                                if (sti.state_id == null)
701:                                    ((ExtendedMessageListener) msg_listener)
702:                                            .getState(os);
703:                                else
704:                                    ((ExtendedMessageListener) msg_listener)
705:                                            .getState(sti.state_id, os);
706:                            }
707:                            return;
708:                        }
709:                        break;
710:
711:                    case Event.STATE_TRANSFER_INPUTSTREAM:
712:                        if (msg_listener != null) {
713:                            StateTransferInfo sti = (StateTransferInfo) evt
714:                                    .getArg();
715:                            InputStream is = sti.inputStream;
716:                            if (is != null
717:                                    && msg_listener instanceof  ExtendedMessageListener) {
718:                                if (sti.state_id == null)
719:                                    ((ExtendedMessageListener) msg_listener)
720:                                            .setState(is);
721:                                else
722:                                    ((ExtendedMessageListener) msg_listener)
723:                                            .setState(sti.state_id, is);
724:                            }
725:                        }
726:                        break;
727:
728:                    case Event.VIEW_CHANGE:
729:                        View v = (View) evt.getArg();
730:                        Vector new_mbrs = v.getMembers();
731:                        setMembers(new_mbrs);
732:                        if (membership_listener != null) {
733:                            membership_listener.viewAccepted(v);
734:                        }
735:                        break;
736:
737:                    case Event.SET_LOCAL_ADDRESS:
738:                        if (log.isTraceEnabled())
739:                            log.trace("setting local_addr (" + local_addr
740:                                    + ") to " + evt.getArg());
741:                        local_addr = (Address) evt.getArg();
742:                        break;
743:
744:                    case Event.SUSPECT:
745:                        if (membership_listener != null) {
746:                            membership_listener.suspect((Address) evt.getArg());
747:                        }
748:                        break;
749:
750:                    case Event.BLOCK:
751:                        if (membership_listener != null) {
752:                            membership_listener.block();
753:                        }
754:                        channel.blockOk();
755:                        break;
756:                    case Event.UNBLOCK:
757:                        if (membership_listener instanceof  ExtendedMembershipListener) {
758:                            ((ExtendedMembershipListener) membership_listener)
759:                                    .unblock();
760:                        }
761:                        break;
762:                    }
763:                }
764:
765:                public void passDown(Event evt) {
766:                    down(evt);
767:                }
768:
769:                /**
770:                 * Called by channel (we registered before) when event is received. This is the UpHandler interface.
771:                 */
772:                public void up(Event evt) {
773:                    if (corr != null) {
774:                        corr.receive(evt); // calls passUp()
775:                    } else {
776:                        if (log.isErrorEnabled()) { //Something is seriously wrong, correlator should not be null since latch is not locked!
777:                            log
778:                                    .error("correlator is null, event will be ignored (evt="
779:                                            + evt + ")");
780:                        }
781:                    }
782:                }
783:
784:                public void down(Event evt) {
785:                    if (channel != null) {
786:                        channel.down(evt);
787:                    } else if (this .log.isWarnEnabled()) {
788:                        this .log.warn("channel is null, discarding event "
789:                                + evt);
790:                    }
791:                }
792:                /* ----------------------- End of Protocol Interface ------------------------ */
793:
794:            }
795:
796:            class TransportAdapter implements  Transport {
797:
798:                public void send(Message msg) throws Exception {
799:                    if (channel != null) {
800:                        channel.send(msg);
801:                    } else if (adapter != null) {
802:                        try {
803:                            if (id != null) {
804:                                adapter.send(id, msg);
805:                            } else {
806:                                adapter.send(msg);
807:                            }
808:                        } catch (Throwable ex) {
809:                            if (log.isErrorEnabled()) {
810:                                log.error("exception=" + Util.print(ex));
811:                            }
812:                        }
813:                    } else {
814:                        if (log.isErrorEnabled()) {
815:                            log.error("channel == null");
816:                        }
817:                    }
818:                }
819:
820:                public Object receive(long timeout) throws Exception {
821:                    return null;
822:                }
823:            }
824:
825:            class PullPushHandler implements  ExtendedMessageListener,
826:                    MembershipListener {
827:
828:                /* ------------------------- MessageListener interface ---------------------- */
829:                public void receive(Message msg) {
830:                    boolean pass_up = true;
831:                    if (corr != null) {
832:                        pass_up = corr.receiveMessage(msg);
833:                    }
834:
835:                    if (pass_up) { // pass on to MessageListener
836:                        if (msg_listener != null) {
837:                            msg_listener.receive(msg);
838:                        }
839:                    }
840:                }
841:
842:                public byte[] getState() {
843:                    return msg_listener != null ? msg_listener.getState()
844:                            : null;
845:                }
846:
847:                public byte[] getState(String state_id) {
848:                    if (msg_listener == null)
849:                        return null;
850:                    if (msg_listener instanceof  ExtendedMessageListener
851:                            && state_id != null) {
852:                        return ((ExtendedMessageListener) msg_listener)
853:                                .getState(state_id);
854:                    } else {
855:                        return msg_listener.getState();
856:                    }
857:                }
858:
859:                public void setState(byte[] state) {
860:                    if (msg_listener != null) {
861:                        msg_listener.setState(state);
862:                    }
863:                }
864:
865:                public void setState(String state_id, byte[] state) {
866:                    if (msg_listener != null) {
867:                        if (msg_listener instanceof  ExtendedMessageListener
868:                                && state_id != null) {
869:                            ((ExtendedMessageListener) msg_listener).setState(
870:                                    state_id, state);
871:                        } else {
872:                            msg_listener.setState(state);
873:                        }
874:                    }
875:                }
876:
877:                public void getState(OutputStream ostream) {
878:                    if (msg_listener instanceof  ExtendedMessageListener) {
879:                        ((ExtendedMessageListener) msg_listener)
880:                                .getState(ostream);
881:                    }
882:                }
883:
884:                public void getState(String state_id, OutputStream ostream) {
885:                    if (msg_listener instanceof  ExtendedMessageListener
886:                            && state_id != null) {
887:                        ((ExtendedMessageListener) msg_listener).getState(
888:                                state_id, ostream);
889:                    }
890:
891:                }
892:
893:                public void setState(InputStream istream) {
894:                    if (msg_listener instanceof  ExtendedMessageListener) {
895:                        ((ExtendedMessageListener) msg_listener)
896:                                .setState(istream);
897:                    }
898:                }
899:
900:                public void setState(String state_id, InputStream istream) {
901:                    if (msg_listener instanceof  ExtendedMessageListener
902:                            && state_id != null) {
903:                        ((ExtendedMessageListener) msg_listener).setState(
904:                                state_id, istream);
905:                    }
906:                }
907:
908:                /*
909:                 * --------------------- End of MessageListener interface
910:                 * -------------------
911:                 */
912:
913:                /* ------------------------ MembershipListener interface -------------------- */
914:                public void viewAccepted(View v) {
915:                    if (corr != null) {
916:                        corr.receiveView(v);
917:                    }
918:
919:                    Vector new_mbrs = v.getMembers();
920:                    setMembers(new_mbrs);
921:                    if (membership_listener != null) {
922:                        membership_listener.viewAccepted(v);
923:                    }
924:                }
925:
926:                public void suspect(Address suspected_mbr) {
927:                    if (corr != null) {
928:                        corr.receiveSuspect(suspected_mbr);
929:                    }
930:                    if (membership_listener != null) {
931:                        membership_listener.suspect(suspected_mbr);
932:                    }
933:                }
934:
935:                public void block() {
936:                    if (membership_listener != null) {
937:                        membership_listener.block();
938:                    }
939:                }
940:
941:                /* --------------------- End of MembershipListener interface ---------------- */
942:
943:                // @todo: receive SET_LOCAL_ADDR event and call corr.setLocalAddress(addr)
944:            }
945:
946:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.