Source Code Cross Referenced for RequestCorrelator.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » blocks » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.blocks 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: RequestCorrelator.java,v 1.30.2.5 2007/04/23 10:15:57 belaban Exp $
002:
003:        package org.jgroups.blocks;
004:
005:        import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
006:        import org.apache.commons.logging.Log;
007:        import org.apache.commons.logging.LogFactory;
008:        import org.jgroups.*;
009:        import org.jgroups.stack.Protocol;
010:        import org.jgroups.util.ReusableThread;
011:        import org.jgroups.util.Scheduler;
012:        import org.jgroups.util.SchedulerListener;
013:        import org.jgroups.util.Streamable;
014:        import org.jgroups.util.ThreadLocalListener;
015:        import org.jgroups.util.Util;
016:
017:        import java.io.*;
018:        import java.util.*;
019:
020:        /**
021:         * Framework to send requests and receive matching responses (matching on
022:         * request ID).
023:         * Multiple requests can be sent at a time. Whenever a response is received,
024:         * the correct <code>RspCollector</code> is looked up (key = id) and its
025:         * method <code>receiveResponse()</code> invoked. A caller may use
026:         * <code>done()</code> to signal that no more responses are expected, and that
027:         * the corresponding entry may be removed.
028:         * <p>
029:         * <code>RequestCorrelator</code> can be installed at both client and server
030:         * sides, it can also switch roles dynamically; i.e., send a request and at
031:         * the same time process an incoming request (when local delivery is enabled,
032:         * this is actually the default).
033:         * <p>
034:         *
035:         * @author Bela Ban
036:         */
037:        public class RequestCorrelator {
038:
039:            /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */
040:            protected Object transport = null;
041:
042:            /** The table of pending requests (keys=Long (request IDs), values=<tt>RequestEntry</tt>) */
043:            protected final Map requests = new ConcurrentReaderHashMap();
044:
045:            /** The handler for the incoming requests. It is called from inside the dispatcher thread */
046:            protected RequestHandler request_handler = null;
047:
048:            /** Possibility for an external marshaller to marshal/unmarshal responses */
049:            protected RpcDispatcher.Marshaller marshaller = null;
050:
051:            /** makes the instance unique (together with IDs) */
052:            protected String name = null;
053:
054:            /** The dispatching thread pool */
055:            protected Scheduler scheduler = null;
056:
057:            /** The address of this group member */
058:            protected Address local_addr = null;
059:
060:            /**
061:             * This field is used only if deadlock detection is enabled.
062:             * In case of nested synchronous requests, it holds a list of the
063:             * addreses of the senders with the address at the bottom being the
064:             * address of the first caller
065:             */
066:            protected ThreadLocal call_stack = new ThreadLocal();
067:
068:            /** Whether or not to perform deadlock detection for synchronous (potentially recursive) group method invocations.
069:             *  If on, we use a scheduler (handling a priority queue), otherwise we don't and call handleRequest() directly.
070:             */
071:            protected boolean deadlock_detection = false;
072:
073:            /**
074:             * This field is used only if deadlock detection is enabled.
075:             * It sets the calling stack to the currently running request
076:             */
077:            private CallStackSetter call_stack_setter = null;
078:
079:            /** Process items on the queue concurrently (Scheduler). The default is to wait until the processing of an item
080:             * has completed before fetching the next item from the queue. Note that setting this to true
081:             * may destroy the properties of a protocol stack, e.g total or causal order may not be
082:             * guaranteed. Set this to true only if you know what you're doing ! */
083:            protected boolean concurrent_processing = false;
084:
085:            protected boolean started = false;
086:
087:            protected static final Log log = LogFactory
088:                    .getLog(RequestCorrelator.class);
089:
090:            /**
091:             * Constructor. Uses transport to send messages. If <code>handler</code>
092:             * is not null, all incoming requests will be dispatched to it (via
093:             * <code>handle(Message)</code>).
094:             *
095:             * @param name Used to differentiate between different RequestCorrelators
096:             * (e.g. in different protocol layers). Has to be unique if multiple
097:             * request correlators are used.
098:             *
099:             * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
100:             *                  used then), or a Protocol (passUp()/passDown() will be used)
101:             *
102:             * @param handler Request handler. Method <code>handle(Message)</code>
103:             * will be called when a request is received.
104:             */
105:            public RequestCorrelator(String name, Object transport,
106:                    RequestHandler handler) {
107:                this .name = name;
108:                this .transport = transport;
109:                request_handler = handler;
110:                start();
111:            }
112:
113:            public RequestCorrelator(String name, Object transport,
114:                    RequestHandler handler, Address local_addr) {
115:                this .name = name;
116:                this .transport = transport;
117:                this .local_addr = local_addr;
118:                request_handler = handler;
119:                start();
120:            }
121:
122:            /**
123:             * Constructor. Uses transport to send messages. If <code>handler</code>
124:             * is not null, all incoming requests will be dispatched to it (via
125:             * <code>handle(Message)</code>).
126:             *
127:             * @param name Used to differentiate between different RequestCorrelators
128:             * (e.g. in different protocol layers). Has to be unique if multiple
129:             * request correlators are used.
130:             *
131:             * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
132:             *                  used then), or a Protocol (passUp()/passDown() will be used)
133:             *
134:             * @param handler Request handler. Method <code>handle(Message)</code>
135:             * will be called when a request is received.
136:             *
137:             * @param deadlock_detection When enabled (true) recursive synchronous
138:             * message calls will be detected and processed with higher priority in
139:             * order to solve deadlocks. Slows down processing a little bit when
140:             * enabled due to runtime checks involved.
141:             */
142:            public RequestCorrelator(String name, Object transport,
143:                    RequestHandler handler, boolean deadlock_detection) {
144:                this .deadlock_detection = deadlock_detection;
145:                this .name = name;
146:                this .transport = transport;
147:                request_handler = handler;
148:                start();
149:            }
150:
151:            public RequestCorrelator(String name, Object transport,
152:                    RequestHandler handler, boolean deadlock_detection,
153:                    boolean concurrent_processing) {
154:                this .deadlock_detection = deadlock_detection;
155:                this .name = name;
156:                this .transport = transport;
157:                request_handler = handler;
158:                this .concurrent_processing = concurrent_processing;
159:                start();
160:            }
161:
162:            public RequestCorrelator(String name, Object transport,
163:                    RequestHandler handler, boolean deadlock_detection,
164:                    Address local_addr) {
165:                this .deadlock_detection = deadlock_detection;
166:                this .name = name;
167:                this .transport = transport;
168:                this .local_addr = local_addr;
169:                request_handler = handler;
170:                start();
171:            }
172:
173:            public RequestCorrelator(String name, Object transport,
174:                    RequestHandler handler, boolean deadlock_detection,
175:                    Address local_addr, boolean concurrent_processing) {
176:                this .deadlock_detection = deadlock_detection;
177:                this .name = name;
178:                this .transport = transport;
179:                this .local_addr = local_addr;
180:                request_handler = handler;
181:                this .concurrent_processing = concurrent_processing;
182:                start();
183:            }
184:
185:            /**
186:             * Switch the deadlock detection mechanism on/off
187:             * @param flag the deadlock detection flag
188:             */
189:            public void setDeadlockDetection(boolean flag) {
190:                if (deadlock_detection != flag) { // only set it if different
191:                    deadlock_detection = flag;
192:                    if (started) {
193:                        if (deadlock_detection) {
194:                            startScheduler();
195:                        } else {
196:                            stopScheduler();
197:                        }
198:                    }
199:                }
200:            }
201:
202:            public void setRequestHandler(RequestHandler handler) {
203:                request_handler = handler;
204:                start();
205:            }
206:
207:            public void setConcurrentProcessing(boolean flag) {
208:                this .concurrent_processing = flag;
209:                if (deadlock_detection && scheduler != null) { // scheduler should never be null if deadlock_detection is true
210:                    scheduler.setConcurrentProcessing(flag);
211:                }
212:            }
213:
214:            /**
215:             * Helper method for {@link #sendRequest(long,List,Message,RspCollector)}.
216:             */
217:            public void sendRequest(long id, Message msg, RspCollector coll)
218:                    throws Exception {
219:                sendRequest(id, null, msg, coll);
220:            }
221:
222:            public RpcDispatcher.Marshaller getMarshaller() {
223:                return marshaller;
224:            }
225:
226:            public void setMarshaller(RpcDispatcher.Marshaller marshaller) {
227:                this .marshaller = marshaller;
228:            }
229:
230:            public void sendRequest(long id, List dest_mbrs, Message msg,
231:                    RspCollector coll) throws Exception {
232:                sendRequest(id, dest_mbrs, msg, coll, false);
233:            }
234:
235:            /**
236:             * Send a request to a group. If no response collector is given, no
237:             * responses are expected (making the call asynchronous).
238:             *
239:             * @param id The request ID. Must be unique for this JVM (e.g. current
240:             * time in millisecs)
241:             * @param dest_mbrs The list of members who should receive the call. Usually a group RPC
242:             *                  is sent via multicast, but a receiver drops the request if its own address
243:             *                  is not in this list. Will not be used if it is null.
244:             * @param msg The request to be sent. The body of the message carries
245:             * the request data
246:             *
247:             * @param coll A response collector (usually the object that invokes
248:             * this method). Its methods <code>receiveResponse()</code> and
249:             * <code>suspect()</code> will be invoked when a message has been received
250:             * or a member is suspected, respectively.
251:             */
252:            public void sendRequest(long id, List dest_mbrs, Message msg,
253:                    RspCollector coll, boolean use_anycasting) throws Exception {
254:                Header hdr;
255:
256:                if (transport == null) {
257:                    if (log.isWarnEnabled())
258:                        log.warn("transport is not available !");
259:                    return;
260:                }
261:
262:                // i. Create the request correlator header and add it to the
263:                // msg
264:                // ii. If a reply is expected (sync call / 'coll != null'), add a
265:                // coresponding entry in the pending requests table
266:                // iii. If deadlock detection is enabled, set/update the call stack
267:                // iv. Pass the msg down to the protocol layer below
268:                hdr = new Header(Header.REQ, id, (coll != null), name);
269:                hdr.dest_mbrs = dest_mbrs;
270:
271:                if (coll != null) {
272:                    if (deadlock_detection) {
273:                        if (local_addr == null) {
274:                            if (log.isErrorEnabled())
275:                                log.error("local address is null !");
276:                            return;
277:                        }
278:                        java.util.Stack local_call_stack = (java.util.Stack) call_stack
279:                                .get();
280:                        java.util.Stack new_call_stack = local_call_stack != null ? (java.util.Stack) local_call_stack
281:                                .clone()
282:                                : new java.util.Stack();
283:                        new_call_stack.push(local_addr);
284:                        hdr.callStack = new_call_stack;
285:                        if (log.isTraceEnabled()) {
286:                            log.trace(new StringBuffer("call stack=").append(
287:                                    hdr.callStack).append(" set for request ")
288:                                    .append(hdr.id));
289:                        }
290:                    }
291:                    addEntry(hdr.id, new RequestEntry(coll));
292:                }
293:                msg.putHeader(name, hdr);
294:
295:                if (transport instanceof  Protocol) {
296:                    if (use_anycasting) {
297:                        Message copy;
298:                        for (Iterator it = dest_mbrs.iterator(); it.hasNext();) {
299:                            Address mbr = (Address) it.next();
300:                            copy = msg.copy(true);
301:                            copy.setDest(mbr);
302:                            ((Protocol) transport).passDown(new Event(
303:                                    Event.MSG, copy));
304:                        }
305:                    } else {
306:                        ((Protocol) transport).passDown(new Event(Event.MSG,
307:                                msg));
308:                    }
309:                } else if (transport instanceof  Transport) {
310:                    if (use_anycasting) {
311:                        Message copy;
312:                        for (Iterator it = dest_mbrs.iterator(); it.hasNext();) {
313:                            Address mbr = (Address) it.next();
314:                            copy = msg.copy(true);
315:                            copy.setDest(mbr);
316:                            ((Transport) transport).send(copy);
317:                        }
318:                    } else {
319:                        ((Transport) transport).send(msg);
320:                    }
321:                } else
322:                    throw new IllegalStateException(
323:                            "transport has to be either a Transport or a Protocol, however it is a "
324:                                    + transport.getClass());
325:            }
326:
327:            /**
328:             * Used to signal that a certain request may be garbage collected as
329:             * all responses have been received.
330:             */
331:            public void done(long id) {
332:                removeEntry(id);
333:            }
334:
335:            /**
336:             * <b>Callback</b>.
337:             * <p>
338:             * Called by the protocol below when a message has been received. The
339:             * algorithm should test whether the message is destined for us and,
340:             * if not, pass it up to the next layer. Otherwise, it should remove
341:             * the header and check whether the message is a request or response.
342:             * In the first case, the message will be delivered to the request
343:             * handler registered (calling its <code>handle()</code> method), in the
344:             * second case, the corresponding response collector is looked up and
345:             * the message delivered.
346:             */
347:            public void receive(Event evt) {
348:                switch (evt.getType()) {
349:                case Event.SUSPECT: // don't wait for responses from faulty members
350:                    receiveSuspect((Address) evt.getArg());
351:                    break;
352:                case Event.VIEW_CHANGE: // adjust number of responses to wait for
353:                    receiveView((View) evt.getArg());
354:                    break;
355:
356:                case Event.SET_LOCAL_ADDRESS:
357:                    setLocalAddress((Address) evt.getArg());
358:                    break;
359:                case Event.MSG:
360:                    if (!receiveMessage((Message) evt.getArg()))
361:                        return;
362:                    break;
363:                }
364:                if (transport instanceof  Protocol)
365:                    ((Protocol) transport).passUp(evt);
366:                else if (log.isErrorEnabled())
367:                    log.error("we do not pass up messages via Transport");
368:            }
369:
370:            /**
371:             */
372:            public final void start() {
373:                if (deadlock_detection) {
374:                    startScheduler();
375:                }
376:                started = true;
377:            }
378:
379:            public void stop() {
380:                stopScheduler();
381:                started = false;
382:            }
383:
384:            void startScheduler() {
385:                if (scheduler == null) {
386:                    scheduler = new Scheduler();
387:                    if (deadlock_detection && call_stack_setter == null) {
388:                        call_stack_setter = new CallStackSetter();
389:                        scheduler.setListener(call_stack_setter);
390:                    }
391:                    if (concurrent_processing)
392:                        scheduler
393:                                .setConcurrentProcessing(concurrent_processing);
394:                    scheduler.start();
395:                }
396:            }
397:
398:            void stopScheduler() {
399:                if (scheduler != null) {
400:                    scheduler.stop();
401:                    scheduler = null;
402:                }
403:            }
404:
405:            // .......................................................................
406:
407:            /**
408:             * <tt>Event.SUSPECT</tt> event received from a layer below.
409:             * <p>
410:             * All response collectors currently registered will
411:             * be notified that <code>mbr</code> may have crashed, so they won't
412:             * wait for its response.
413:             */
414:            public void receiveSuspect(Address mbr) {
415:                RequestEntry entry;
416:                // ArrayList    copy;
417:
418:                if (mbr == null)
419:                    return;
420:                if (log.isDebugEnabled())
421:                    log.debug("suspect=" + mbr);
422:
423:                // copy so we don't run into bug #761804 - Bela June 27 2003
424:                // copy=new ArrayList(requests.values()); // removed because ConcurrentReaderHashMap can tolerate concurrent mods (bela May 8 2006)
425:                for (Iterator it = requests.values().iterator(); it.hasNext();) {
426:                    entry = (RequestEntry) it.next();
427:                    if (entry.coll != null)
428:                        entry.coll.suspect(mbr);
429:                }
430:            }
431:
432:            /**
433:             * <tt>Event.VIEW_CHANGE</tt> event received from a layer below.
434:             * <p>
435:             * Mark all responses from members that are not in new_view as
436:             * NOT_RECEIVED.
437:             *
438:             */
439:            public void receiveView(View new_view) {
440:                RequestEntry entry;
441:                // ArrayList    copy;
442:
443:                // copy so we don't run into bug #761804 - Bela June 27 2003
444:                // copy=new ArrayList(requests.values());  // removed because ConcurrentReaderHashMap can tolerate concurrent mods (bela May 8 2006)
445:                for (Iterator it = requests.values().iterator(); it.hasNext();) {
446:                    entry = (RequestEntry) it.next();
447:                    if (entry.coll != null)
448:                        entry.coll.viewChange(new_view);
449:                }
450:            }
451:
452:            /**
453:             * Handles a message coming from a layer below
454:             *
455:             * @return true if the event should be forwarded further up, otherwise false (message was consumed)
456:             */
457:            public boolean receiveMessage(Message msg) {
458:                Object tmpHdr;
459:
460:                // i. If header is not an instance of request correlator header, ignore
461:                //
462:                // ii. Check whether the message was sent by a request correlator with
463:                // the same name (there may be multiple request correlators in the same
464:                // protocol stack...)
465:                tmpHdr = msg.getHeader(name);
466:                if (tmpHdr == null || !(tmpHdr instanceof  Header)) {
467:                    return true;
468:                }
469:
470:                Header hdr = (Header) tmpHdr;
471:                if (hdr.corrName == null || !hdr.corrName.equals(name)) {
472:                    if (log.isTraceEnabled()) {
473:                        log.trace(new StringBuffer(
474:                                "name of request correlator header (").append(
475:                                hdr.corrName).append(
476:                                ") is different from ours (").append(name)
477:                                .append("). Msg not accepted, passed up"));
478:                    }
479:                    return true;
480:                }
481:
482:                // If the header contains a destination list, and we are not part of it, then we discard the
483:                // request (was addressed to other members)
484:                java.util.List dests = hdr.dest_mbrs;
485:                if (dests != null && local_addr != null
486:                        && !dests.contains(local_addr)) {
487:                    if (log.isTraceEnabled()) {
488:                        log
489:                                .trace(new StringBuffer(
490:                                        "discarded request from ")
491:                                        .append(msg.getSrc())
492:                                        .append(
493:                                                " as we are not part of destination list (local_addr=")
494:                                        .append(local_addr).append(", hdr=")
495:                                        .append(hdr).append(')'));
496:                    }
497:                    return false;
498:                }
499:
500:                // [Header.REQ]:
501:                // i. If there is no request handler, discard
502:                // ii. Check whether priority: if synchronous and call stack contains
503:                // address that equals local address -> add priority request. Else
504:                // add normal request.
505:                //
506:                // [Header.RSP]:
507:                // Remove the msg request correlator header and notify the associated
508:                // <tt>RspCollector</tt> that a reply has been received
509:                switch (hdr.type) {
510:                case Header.REQ:
511:                    if (request_handler == null) {
512:                        if (log.isWarnEnabled()) {
513:                            log
514:                                    .warn("there is no request handler installed to deliver request !");
515:                        }
516:                        return false;
517:                    }
518:
519:                    if (deadlock_detection) {
520:                        if (scheduler == null) {
521:                            log
522:                                    .error("deadlock_detection is true, but scheduler is null: this is not supposed to happen"
523:                                            + " (discarding request)");
524:                            break;
525:                        }
526:
527:                        Request req = new Request(msg);
528:                        java.util.Stack stack = hdr.callStack;
529:                        if (hdr.rsp_expected && stack != null
530:                                && local_addr != null) {
531:                            if (stack.contains(local_addr)) {
532:                                if (log.isTraceEnabled())
533:                                    log
534:                                            .trace("call stack="
535:                                                    + hdr.callStack
536:                                                    + " contains "
537:                                                    + local_addr
538:                                                    + ": adding request to priority queue");
539:                                scheduler.addPrio(req);
540:                                break;
541:                            }
542:                        }
543:                        scheduler.add(req);
544:                        break;
545:                    }
546:
547:                    handleRequest(msg);
548:                    break;
549:
550:                case Header.RSP:
551:                    msg.removeHeader(name);
552:                    RspCollector coll = findEntry(hdr.id);
553:                    if (coll != null) {
554:                        Address sender = msg.getSrc();
555:                        Object retval = null;
556:                        byte[] buf = msg.getBuffer();
557:                        try {
558:                            retval = marshaller != null ? marshaller
559:                                    .objectFromByteBuffer(buf) : Util
560:                                    .objectFromByteBuffer(buf);
561:                        } catch (Exception e) {
562:                            log
563:                                    .error(
564:                                            "failed unmarshalling buffer into return value",
565:                                            e);
566:                            retval = e;
567:                        }
568:                        coll.receiveResponse(retval, sender);
569:                    }
570:                    break;
571:
572:                default:
573:                    msg.removeHeader(name);
574:                    if (log.isErrorEnabled())
575:                        log.error("header's type is neither REQ nor RSP !");
576:                    break;
577:                }
578:
579:                return false;
580:            }
581:
582:            public Address getLocalAddress() {
583:                return local_addr;
584:            }
585:
586:            public void setLocalAddress(Address local_addr) {
587:                this .local_addr = local_addr;
588:            }
589:
590:            // .......................................................................
591:
592:            /**
593:             * Add an association of:<br>
594:             * ID -> <tt>RspCollector</tt>
595:             */
596:            private void addEntry(long id, RequestEntry entry) {
597:                Long id_obj = new Long(id);
598:                synchronized (requests) {
599:                    if (!requests.containsKey(id_obj))
600:                        requests.put(id_obj, entry);
601:                    else if (log.isWarnEnabled())
602:                        log.warn("entry " + entry + " for request-id=" + id
603:                                + " already present !");
604:                }
605:            }
606:
607:            /**
608:             * Remove the request entry associated with the given ID
609:             *
610:             * @param id the id of the <tt>RequestEntry</tt> to remove
611:             */
612:            private void removeEntry(long id) {
613:                Long id_obj = new Long(id);
614:
615:                // changed by bela Feb 28 2003 (bug fix for 690606)
616:                // changed back to use synchronization by bela June 27 2003 (bug fix for #761804),
617:                // we can do this because we now copy for iteration (viewChange() and suspect())
618:                requests.remove(id_obj);
619:            }
620:
621:            /**
622:             * @param id the ID of the corresponding <tt>RspCollector</tt>
623:             *
624:             * @return the <tt>RspCollector</tt> associated with the given ID
625:             */
626:            private RspCollector findEntry(long id) {
627:                Long id_obj = new Long(id);
628:                RequestEntry entry;
629:
630:                entry = (RequestEntry) requests.get(id_obj);
631:                return ((entry != null) ? entry.coll : null);
632:            }
633:
634:            /**
635:             * Handle a request msg for this correlator
636:             *
637:             * @param req the request msg
638:             */
639:            private void handleRequest(Message req) {
640:                Object retval;
641:                byte[] rsp_buf;
642:                Header hdr, rsp_hdr;
643:                Message rsp;
644:
645:                // i. Remove the request correlator header from the msg and pass it to
646:                // the registered handler
647:                //
648:                // ii. If a reply is expected, pack the return value from the request
649:                // handler to a reply msg and send it back. The reply msg has the same
650:                // ID as the request and the name of the sender request correlator
651:                hdr = (Header) req.removeHeader(name);
652:
653:                if (log.isTraceEnabled()) {
654:                    log.trace(new StringBuffer("calling (").append(
655:                            (request_handler != null ? request_handler
656:                                    .getClass().getName() : "null")).append(
657:                            ") with request ").append(hdr.id));
658:                }
659:
660:                try {
661:                    retval = request_handler.handle(req);
662:                } catch (Throwable t) {
663:                    if (log.isErrorEnabled())
664:                        log.error("error invoking method", t);
665:                    retval = t;
666:                }
667:
668:                if (!hdr.rsp_expected) // asynchronous call, we don't need to send a response; terminate call here
669:                    return;
670:
671:                if (transport == null) {
672:                    if (log.isErrorEnabled())
673:                        log
674:                                .error("failure sending response; no transport available");
675:                    return;
676:                }
677:
678:                // changed (bela Feb 20 2004): catch exception and return exception
679:                try { // retval could be an exception, or a real value
680:                    rsp_buf = marshaller != null ? marshaller
681:                            .objectToByteBuffer(retval) : Util
682:                            .objectToByteBuffer(retval);
683:                } catch (Throwable t) {
684:                    try { // this call should succeed (all exceptions are serializable)
685:                        rsp_buf = marshaller != null ? marshaller
686:                                .objectToByteBuffer(t) : Util
687:                                .objectToByteBuffer(t);
688:                    } catch (Throwable tt) {
689:                        if (log.isErrorEnabled())
690:                            log.error("failed sending rsp: return value ("
691:                                    + retval + ") is not serializable");
692:                        return;
693:                    }
694:                }
695:
696:                rsp = req.makeReply();
697:                if (rsp_buf != null)
698:                    rsp.setBuffer(rsp_buf);
699:                rsp_hdr = new Header(Header.RSP, hdr.id, false, name);
700:                rsp.putHeader(name, rsp_hdr);
701:                if (log.isTraceEnabled())
702:                    log.trace(new StringBuffer("sending rsp for ").append(
703:                            rsp_hdr.id).append(" to ").append(rsp.getDest()));
704:
705:                try {
706:                    if (transport instanceof  Protocol)
707:                        ((Protocol) transport).passDown(new Event(Event.MSG,
708:                                rsp));
709:                    else if (transport instanceof  Transport)
710:                        ((Transport) transport).send(rsp);
711:                    else if (log.isErrorEnabled())
712:                        log.error("transport object has to be either a "
713:                                + "Transport or a Protocol, however it is a "
714:                                + transport.getClass());
715:                } catch (Throwable e) {
716:                    if (log.isErrorEnabled())
717:                        log.error("failed sending the response", e);
718:                }
719:            }
720:
721:            // .......................................................................
722:
723:            /**
724:             * Associates an ID with an <tt>RspCollector</tt>
725:             */
726:            private static class RequestEntry {
727:                public RspCollector coll;
728:
729:                public RequestEntry(RspCollector coll) {
730:                    this .coll = coll;
731:                }
732:            }
733:
734:            /**
735:             * The header for <tt>RequestCorrelator</tt> messages
736:             */
737:            public static final class Header extends org.jgroups.Header
738:                    implements  Streamable {
739:                public static final byte REQ = 0;
740:                public static final byte RSP = 1;
741:
742:                /** Type of header: request or reply */
743:                public byte type = REQ;
744:                /**
745:                 * The id of this request to distinguish among other requests from
746:                 * the same <tt>RequestCorrelator</tt> */
747:                public long id = 0;
748:
749:                /** msg is synchronous if true */
750:                public boolean rsp_expected = true;
751:
752:                /** The unique name of the associated <tt>RequestCorrelator</tt> */
753:                public String corrName = null;
754:
755:                /** Stack<Address>. Contains senders (e.g. P --> Q --> R) */
756:                public java.util.Stack callStack = null;
757:
758:                /** Contains a list of members who should receive the request (others will drop). Ignored if null */
759:                public java.util.List dest_mbrs = null;
760:
761:                /**
762:                 * Used for externalization
763:                 */
764:                public Header() {
765:                }
766:
767:                /**
768:                 * @param type type of header (<tt>REQ</tt>/<tt>RSP</tt>)
769:                 * @param id id of this header relative to ids of other requests
770:                 * originating from the same correlator
771:                 * @param rsp_expected whether it's a sync or async request
772:                 * @param name the name of the <tt>RequestCorrelator</tt> from which
773:                 */
774:                public Header(byte type, long id, boolean rsp_expected,
775:                        String name) {
776:                    this .type = type;
777:                    this .id = id;
778:                    this .rsp_expected = rsp_expected;
779:                    this .corrName = name;
780:                }
781:
782:                /**
783:                 */
784:                public String toString() {
785:                    StringBuffer ret = new StringBuffer();
786:                    ret.append("[Header: name=" + corrName + ", type=");
787:                    ret.append(type == REQ ? "REQ" : type == RSP ? "RSP"
788:                            : "<unknown>");
789:                    ret.append(", id=" + id);
790:                    ret.append(", rsp_expected=" + rsp_expected + ']');
791:                    if (callStack != null)
792:                        ret.append(", call stack=" + callStack);
793:                    if (dest_mbrs != null)
794:                        ret.append(", dest_mbrs=").append(dest_mbrs);
795:                    return ret.toString();
796:                }
797:
798:                public void writeExternal(ObjectOutput out) throws IOException {
799:                    out.writeByte(type);
800:                    out.writeLong(id);
801:                    out.writeBoolean(rsp_expected);
802:                    if (corrName != null) {
803:                        out.writeBoolean(true);
804:                        out.writeUTF(corrName);
805:                    } else {
806:                        out.writeBoolean(false);
807:                    }
808:                    out.writeObject(callStack);
809:                    out.writeObject(dest_mbrs);
810:                }
811:
812:                public void readExternal(ObjectInput in) throws IOException,
813:                        ClassNotFoundException {
814:                    type = in.readByte();
815:                    id = in.readLong();
816:                    rsp_expected = in.readBoolean();
817:                    if (in.readBoolean())
818:                        corrName = in.readUTF();
819:                    callStack = (java.util.Stack) in.readObject();
820:                    dest_mbrs = (java.util.List) in.readObject();
821:                }
822:
823:                public void writeTo(DataOutputStream out) throws IOException {
824:                    out.writeByte(type);
825:                    out.writeLong(id);
826:                    out.writeBoolean(rsp_expected);
827:
828:                    if (corrName != null) {
829:                        out.writeBoolean(true);
830:                        out.writeUTF(corrName);
831:                    } else {
832:                        out.writeBoolean(false);
833:                    }
834:
835:                    if (callStack != null) {
836:                        out.writeBoolean(true);
837:                        out.writeShort(callStack.size());
838:                        Address mbr;
839:                        for (int i = 0; i < callStack.size(); i++) {
840:                            mbr = (Address) callStack.elementAt(i);
841:                            Util.writeAddress(mbr, out);
842:                        }
843:                    } else {
844:                        out.writeBoolean(false);
845:                    }
846:
847:                    Util.writeAddresses(dest_mbrs, out);
848:                }
849:
850:                public void readFrom(DataInputStream in) throws IOException,
851:                        IllegalAccessException, InstantiationException {
852:                    boolean present;
853:                    type = in.readByte();
854:                    id = in.readLong();
855:                    rsp_expected = in.readBoolean();
856:
857:                    present = in.readBoolean();
858:                    if (present)
859:                        corrName = in.readUTF();
860:
861:                    present = in.readBoolean();
862:                    if (present) {
863:                        callStack = new Stack();
864:                        short len = in.readShort();
865:                        Address tmp;
866:                        for (short i = 0; i < len; i++) {
867:                            tmp = Util.readAddress(in);
868:                            callStack.add(tmp);
869:                        }
870:                    }
871:
872:                    dest_mbrs = (List) Util.readAddresses(in,
873:                            java.util.LinkedList.class);
874:                }
875:
876:                public long size() {
877:                    long retval = Global.BYTE_SIZE // type
878:                            + Global.LONG_SIZE // id
879:                            + Global.BYTE_SIZE; // rsp_expected
880:
881:                    retval += Global.BYTE_SIZE; // presence for corrName
882:                    if (corrName != null)
883:                        retval += corrName.length() + 2; // UTF
884:
885:                    retval += Global.BYTE_SIZE; // presence
886:                    if (callStack != null) {
887:                        retval += Global.SHORT_SIZE; // number of elements
888:                        if (callStack.size() > 0) {
889:                            Address mbr = (Address) callStack.firstElement();
890:                            retval += callStack.size() * (Util.size(mbr));
891:                        }
892:                    }
893:
894:                    retval += Util.size(dest_mbrs);
895:                    return retval;
896:                }
897:
898:            }
899:
900:            /**
901:             * Listens for scheduler events and sets the current call chain (stack)
902:             * whenever a thread is started, or a suspended thread resumed. Does
903:             * this only for synchronous requests (<code>Runnable</code> is actually
904:             * a <code>Request</code>).
905:             */
906:            private class CallStackSetter implements  SchedulerListener {
907:                public void started(ReusableThread rt, Runnable r) {
908:                    setCallStack(rt, r);
909:                }
910:
911:                public void stopped(ReusableThread rt, Runnable r) {
912:                }
913:
914:                public void suspended(ReusableThread rt, Runnable r) {
915:                }
916:
917:                public void resumed(ReusableThread rt, Runnable r) {
918:                    setCallStack(rt, r);
919:                }
920:
921:                void setCallStack(ReusableThread rt, Runnable r) {
922:                    Message req;
923:                    Header hdr;
924:                    Object obj;
925:
926:                    req = ((Request) r).req;
927:                    if (req == null)
928:                        return;
929:
930:                    obj = req.getHeader(name);
931:                    if (obj == null || !(obj instanceof  Header))
932:                        return;
933:
934:                    hdr = (Header) obj;
935:                    if (hdr.rsp_expected == false)
936:                        return;
937:
938:                    final java.util.Stack new_stack = (java.util.Stack) hdr.callStack
939:                            .clone();
940:                    if (new_stack != null)
941:                        rt.assignThreadLocalListener(new ThreadLocalListener() {
942:                            public void setThreadLocal() {
943:                                call_stack.set(new_stack);
944:                            }
945:
946:                            public void resetThreadLocal() {
947:                                call_stack.set(null);
948:                            }
949:                        });
950:                }
951:            }
952:
953:            /**
954:             * The runnable for an incoming request which is submitted to the
955:             * dispatcher
956:             */
957:            private class Request implements  Runnable {
958:                public final Message req;
959:
960:                public Request(Message req) {
961:                    this .req = req;
962:                }
963:
964:                public void run() {
965:                    handleRequest(req);
966:                }
967:
968:                public String toString() {
969:                    StringBuffer sb = new StringBuffer();
970:                    if (req != null)
971:                        sb.append("req=" + req + ", headers="
972:                                + req.printObjectHeaders());
973:                    return sb.toString();
974:                }
975:            }
976:
977:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.