001: // $Id: RequestCorrelator.java,v 1.30.2.5 2007/04/23 10:15:57 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
006: import org.apache.commons.logging.Log;
007: import org.apache.commons.logging.LogFactory;
008: import org.jgroups.*;
009: import org.jgroups.stack.Protocol;
010: import org.jgroups.util.ReusableThread;
011: import org.jgroups.util.Scheduler;
012: import org.jgroups.util.SchedulerListener;
013: import org.jgroups.util.Streamable;
014: import org.jgroups.util.ThreadLocalListener;
015: import org.jgroups.util.Util;
016:
017: import java.io.*;
018: import java.util.*;
019:
020: /**
021: * Framework to send requests and receive matching responses (matching on
022: * request ID).
023: * Multiple requests can be sent at a time. Whenever a response is received,
024: * the correct <code>RspCollector</code> is looked up (key = id) and its
025: * method <code>receiveResponse()</code> invoked. A caller may use
026: * <code>done()</code> to signal that no more responses are expected, and that
027: * the corresponding entry may be removed.
028: * <p>
029: * <code>RequestCorrelator</code> can be installed at both client and server
030: * sides, it can also switch roles dynamically; i.e., send a request and at
031: * the same time process an incoming request (when local delivery is enabled,
032: * this is actually the default).
033: * <p>
034: *
035: * @author Bela Ban
036: */
037: public class RequestCorrelator {
038:
039: /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */
040: protected Object transport = null;
041:
042: /** The table of pending requests (keys=Long (request IDs), values=<tt>RequestEntry</tt>) */
043: protected final Map requests = new ConcurrentReaderHashMap();
044:
045: /** The handler for the incoming requests. It is called from inside the dispatcher thread */
046: protected RequestHandler request_handler = null;
047:
048: /** Possibility for an external marshaller to marshal/unmarshal responses */
049: protected RpcDispatcher.Marshaller marshaller = null;
050:
051: /** makes the instance unique (together with IDs) */
052: protected String name = null;
053:
054: /** The dispatching thread pool */
055: protected Scheduler scheduler = null;
056:
057: /** The address of this group member */
058: protected Address local_addr = null;
059:
060: /**
061: * This field is used only if deadlock detection is enabled.
062: * In case of nested synchronous requests, it holds a list of the
063: * addreses of the senders with the address at the bottom being the
064: * address of the first caller
065: */
066: protected ThreadLocal call_stack = new ThreadLocal();
067:
068: /** Whether or not to perform deadlock detection for synchronous (potentially recursive) group method invocations.
069: * If on, we use a scheduler (handling a priority queue), otherwise we don't and call handleRequest() directly.
070: */
071: protected boolean deadlock_detection = false;
072:
073: /**
074: * This field is used only if deadlock detection is enabled.
075: * It sets the calling stack to the currently running request
076: */
077: private CallStackSetter call_stack_setter = null;
078:
079: /** Process items on the queue concurrently (Scheduler). The default is to wait until the processing of an item
080: * has completed before fetching the next item from the queue. Note that setting this to true
081: * may destroy the properties of a protocol stack, e.g total or causal order may not be
082: * guaranteed. Set this to true only if you know what you're doing ! */
083: protected boolean concurrent_processing = false;
084:
085: protected boolean started = false;
086:
087: protected static final Log log = LogFactory
088: .getLog(RequestCorrelator.class);
089:
090: /**
091: * Constructor. Uses transport to send messages. If <code>handler</code>
092: * is not null, all incoming requests will be dispatched to it (via
093: * <code>handle(Message)</code>).
094: *
095: * @param name Used to differentiate between different RequestCorrelators
096: * (e.g. in different protocol layers). Has to be unique if multiple
097: * request correlators are used.
098: *
099: * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
100: * used then), or a Protocol (passUp()/passDown() will be used)
101: *
102: * @param handler Request handler. Method <code>handle(Message)</code>
103: * will be called when a request is received.
104: */
105: public RequestCorrelator(String name, Object transport,
106: RequestHandler handler) {
107: this .name = name;
108: this .transport = transport;
109: request_handler = handler;
110: start();
111: }
112:
113: public RequestCorrelator(String name, Object transport,
114: RequestHandler handler, Address local_addr) {
115: this .name = name;
116: this .transport = transport;
117: this .local_addr = local_addr;
118: request_handler = handler;
119: start();
120: }
121:
122: /**
123: * Constructor. Uses transport to send messages. If <code>handler</code>
124: * is not null, all incoming requests will be dispatched to it (via
125: * <code>handle(Message)</code>).
126: *
127: * @param name Used to differentiate between different RequestCorrelators
128: * (e.g. in different protocol layers). Has to be unique if multiple
129: * request correlators are used.
130: *
131: * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
132: * used then), or a Protocol (passUp()/passDown() will be used)
133: *
134: * @param handler Request handler. Method <code>handle(Message)</code>
135: * will be called when a request is received.
136: *
137: * @param deadlock_detection When enabled (true) recursive synchronous
138: * message calls will be detected and processed with higher priority in
139: * order to solve deadlocks. Slows down processing a little bit when
140: * enabled due to runtime checks involved.
141: */
142: public RequestCorrelator(String name, Object transport,
143: RequestHandler handler, boolean deadlock_detection) {
144: this .deadlock_detection = deadlock_detection;
145: this .name = name;
146: this .transport = transport;
147: request_handler = handler;
148: start();
149: }
150:
151: public RequestCorrelator(String name, Object transport,
152: RequestHandler handler, boolean deadlock_detection,
153: boolean concurrent_processing) {
154: this .deadlock_detection = deadlock_detection;
155: this .name = name;
156: this .transport = transport;
157: request_handler = handler;
158: this .concurrent_processing = concurrent_processing;
159: start();
160: }
161:
162: public RequestCorrelator(String name, Object transport,
163: RequestHandler handler, boolean deadlock_detection,
164: Address local_addr) {
165: this .deadlock_detection = deadlock_detection;
166: this .name = name;
167: this .transport = transport;
168: this .local_addr = local_addr;
169: request_handler = handler;
170: start();
171: }
172:
173: public RequestCorrelator(String name, Object transport,
174: RequestHandler handler, boolean deadlock_detection,
175: Address local_addr, boolean concurrent_processing) {
176: this .deadlock_detection = deadlock_detection;
177: this .name = name;
178: this .transport = transport;
179: this .local_addr = local_addr;
180: request_handler = handler;
181: this .concurrent_processing = concurrent_processing;
182: start();
183: }
184:
185: /**
186: * Switch the deadlock detection mechanism on/off
187: * @param flag the deadlock detection flag
188: */
189: public void setDeadlockDetection(boolean flag) {
190: if (deadlock_detection != flag) { // only set it if different
191: deadlock_detection = flag;
192: if (started) {
193: if (deadlock_detection) {
194: startScheduler();
195: } else {
196: stopScheduler();
197: }
198: }
199: }
200: }
201:
202: public void setRequestHandler(RequestHandler handler) {
203: request_handler = handler;
204: start();
205: }
206:
207: public void setConcurrentProcessing(boolean flag) {
208: this .concurrent_processing = flag;
209: if (deadlock_detection && scheduler != null) { // scheduler should never be null if deadlock_detection is true
210: scheduler.setConcurrentProcessing(flag);
211: }
212: }
213:
214: /**
215: * Helper method for {@link #sendRequest(long,List,Message,RspCollector)}.
216: */
217: public void sendRequest(long id, Message msg, RspCollector coll)
218: throws Exception {
219: sendRequest(id, null, msg, coll);
220: }
221:
222: public RpcDispatcher.Marshaller getMarshaller() {
223: return marshaller;
224: }
225:
226: public void setMarshaller(RpcDispatcher.Marshaller marshaller) {
227: this .marshaller = marshaller;
228: }
229:
230: public void sendRequest(long id, List dest_mbrs, Message msg,
231: RspCollector coll) throws Exception {
232: sendRequest(id, dest_mbrs, msg, coll, false);
233: }
234:
235: /**
236: * Send a request to a group. If no response collector is given, no
237: * responses are expected (making the call asynchronous).
238: *
239: * @param id The request ID. Must be unique for this JVM (e.g. current
240: * time in millisecs)
241: * @param dest_mbrs The list of members who should receive the call. Usually a group RPC
242: * is sent via multicast, but a receiver drops the request if its own address
243: * is not in this list. Will not be used if it is null.
244: * @param msg The request to be sent. The body of the message carries
245: * the request data
246: *
247: * @param coll A response collector (usually the object that invokes
248: * this method). Its methods <code>receiveResponse()</code> and
249: * <code>suspect()</code> will be invoked when a message has been received
250: * or a member is suspected, respectively.
251: */
252: public void sendRequest(long id, List dest_mbrs, Message msg,
253: RspCollector coll, boolean use_anycasting) throws Exception {
254: Header hdr;
255:
256: if (transport == null) {
257: if (log.isWarnEnabled())
258: log.warn("transport is not available !");
259: return;
260: }
261:
262: // i. Create the request correlator header and add it to the
263: // msg
264: // ii. If a reply is expected (sync call / 'coll != null'), add a
265: // coresponding entry in the pending requests table
266: // iii. If deadlock detection is enabled, set/update the call stack
267: // iv. Pass the msg down to the protocol layer below
268: hdr = new Header(Header.REQ, id, (coll != null), name);
269: hdr.dest_mbrs = dest_mbrs;
270:
271: if (coll != null) {
272: if (deadlock_detection) {
273: if (local_addr == null) {
274: if (log.isErrorEnabled())
275: log.error("local address is null !");
276: return;
277: }
278: java.util.Stack local_call_stack = (java.util.Stack) call_stack
279: .get();
280: java.util.Stack new_call_stack = local_call_stack != null ? (java.util.Stack) local_call_stack
281: .clone()
282: : new java.util.Stack();
283: new_call_stack.push(local_addr);
284: hdr.callStack = new_call_stack;
285: if (log.isTraceEnabled()) {
286: log.trace(new StringBuffer("call stack=").append(
287: hdr.callStack).append(" set for request ")
288: .append(hdr.id));
289: }
290: }
291: addEntry(hdr.id, new RequestEntry(coll));
292: }
293: msg.putHeader(name, hdr);
294:
295: if (transport instanceof Protocol) {
296: if (use_anycasting) {
297: Message copy;
298: for (Iterator it = dest_mbrs.iterator(); it.hasNext();) {
299: Address mbr = (Address) it.next();
300: copy = msg.copy(true);
301: copy.setDest(mbr);
302: ((Protocol) transport).passDown(new Event(
303: Event.MSG, copy));
304: }
305: } else {
306: ((Protocol) transport).passDown(new Event(Event.MSG,
307: msg));
308: }
309: } else if (transport instanceof Transport) {
310: if (use_anycasting) {
311: Message copy;
312: for (Iterator it = dest_mbrs.iterator(); it.hasNext();) {
313: Address mbr = (Address) it.next();
314: copy = msg.copy(true);
315: copy.setDest(mbr);
316: ((Transport) transport).send(copy);
317: }
318: } else {
319: ((Transport) transport).send(msg);
320: }
321: } else
322: throw new IllegalStateException(
323: "transport has to be either a Transport or a Protocol, however it is a "
324: + transport.getClass());
325: }
326:
327: /**
328: * Used to signal that a certain request may be garbage collected as
329: * all responses have been received.
330: */
331: public void done(long id) {
332: removeEntry(id);
333: }
334:
335: /**
336: * <b>Callback</b>.
337: * <p>
338: * Called by the protocol below when a message has been received. The
339: * algorithm should test whether the message is destined for us and,
340: * if not, pass it up to the next layer. Otherwise, it should remove
341: * the header and check whether the message is a request or response.
342: * In the first case, the message will be delivered to the request
343: * handler registered (calling its <code>handle()</code> method), in the
344: * second case, the corresponding response collector is looked up and
345: * the message delivered.
346: */
347: public void receive(Event evt) {
348: switch (evt.getType()) {
349: case Event.SUSPECT: // don't wait for responses from faulty members
350: receiveSuspect((Address) evt.getArg());
351: break;
352: case Event.VIEW_CHANGE: // adjust number of responses to wait for
353: receiveView((View) evt.getArg());
354: break;
355:
356: case Event.SET_LOCAL_ADDRESS:
357: setLocalAddress((Address) evt.getArg());
358: break;
359: case Event.MSG:
360: if (!receiveMessage((Message) evt.getArg()))
361: return;
362: break;
363: }
364: if (transport instanceof Protocol)
365: ((Protocol) transport).passUp(evt);
366: else if (log.isErrorEnabled())
367: log.error("we do not pass up messages via Transport");
368: }
369:
370: /**
371: */
372: public final void start() {
373: if (deadlock_detection) {
374: startScheduler();
375: }
376: started = true;
377: }
378:
379: public void stop() {
380: stopScheduler();
381: started = false;
382: }
383:
384: void startScheduler() {
385: if (scheduler == null) {
386: scheduler = new Scheduler();
387: if (deadlock_detection && call_stack_setter == null) {
388: call_stack_setter = new CallStackSetter();
389: scheduler.setListener(call_stack_setter);
390: }
391: if (concurrent_processing)
392: scheduler
393: .setConcurrentProcessing(concurrent_processing);
394: scheduler.start();
395: }
396: }
397:
398: void stopScheduler() {
399: if (scheduler != null) {
400: scheduler.stop();
401: scheduler = null;
402: }
403: }
404:
405: // .......................................................................
406:
407: /**
408: * <tt>Event.SUSPECT</tt> event received from a layer below.
409: * <p>
410: * All response collectors currently registered will
411: * be notified that <code>mbr</code> may have crashed, so they won't
412: * wait for its response.
413: */
414: public void receiveSuspect(Address mbr) {
415: RequestEntry entry;
416: // ArrayList copy;
417:
418: if (mbr == null)
419: return;
420: if (log.isDebugEnabled())
421: log.debug("suspect=" + mbr);
422:
423: // copy so we don't run into bug #761804 - Bela June 27 2003
424: // copy=new ArrayList(requests.values()); // removed because ConcurrentReaderHashMap can tolerate concurrent mods (bela May 8 2006)
425: for (Iterator it = requests.values().iterator(); it.hasNext();) {
426: entry = (RequestEntry) it.next();
427: if (entry.coll != null)
428: entry.coll.suspect(mbr);
429: }
430: }
431:
432: /**
433: * <tt>Event.VIEW_CHANGE</tt> event received from a layer below.
434: * <p>
435: * Mark all responses from members that are not in new_view as
436: * NOT_RECEIVED.
437: *
438: */
439: public void receiveView(View new_view) {
440: RequestEntry entry;
441: // ArrayList copy;
442:
443: // copy so we don't run into bug #761804 - Bela June 27 2003
444: // copy=new ArrayList(requests.values()); // removed because ConcurrentReaderHashMap can tolerate concurrent mods (bela May 8 2006)
445: for (Iterator it = requests.values().iterator(); it.hasNext();) {
446: entry = (RequestEntry) it.next();
447: if (entry.coll != null)
448: entry.coll.viewChange(new_view);
449: }
450: }
451:
452: /**
453: * Handles a message coming from a layer below
454: *
455: * @return true if the event should be forwarded further up, otherwise false (message was consumed)
456: */
457: public boolean receiveMessage(Message msg) {
458: Object tmpHdr;
459:
460: // i. If header is not an instance of request correlator header, ignore
461: //
462: // ii. Check whether the message was sent by a request correlator with
463: // the same name (there may be multiple request correlators in the same
464: // protocol stack...)
465: tmpHdr = msg.getHeader(name);
466: if (tmpHdr == null || !(tmpHdr instanceof Header)) {
467: return true;
468: }
469:
470: Header hdr = (Header) tmpHdr;
471: if (hdr.corrName == null || !hdr.corrName.equals(name)) {
472: if (log.isTraceEnabled()) {
473: log.trace(new StringBuffer(
474: "name of request correlator header (").append(
475: hdr.corrName).append(
476: ") is different from ours (").append(name)
477: .append("). Msg not accepted, passed up"));
478: }
479: return true;
480: }
481:
482: // If the header contains a destination list, and we are not part of it, then we discard the
483: // request (was addressed to other members)
484: java.util.List dests = hdr.dest_mbrs;
485: if (dests != null && local_addr != null
486: && !dests.contains(local_addr)) {
487: if (log.isTraceEnabled()) {
488: log
489: .trace(new StringBuffer(
490: "discarded request from ")
491: .append(msg.getSrc())
492: .append(
493: " as we are not part of destination list (local_addr=")
494: .append(local_addr).append(", hdr=")
495: .append(hdr).append(')'));
496: }
497: return false;
498: }
499:
500: // [Header.REQ]:
501: // i. If there is no request handler, discard
502: // ii. Check whether priority: if synchronous and call stack contains
503: // address that equals local address -> add priority request. Else
504: // add normal request.
505: //
506: // [Header.RSP]:
507: // Remove the msg request correlator header and notify the associated
508: // <tt>RspCollector</tt> that a reply has been received
509: switch (hdr.type) {
510: case Header.REQ:
511: if (request_handler == null) {
512: if (log.isWarnEnabled()) {
513: log
514: .warn("there is no request handler installed to deliver request !");
515: }
516: return false;
517: }
518:
519: if (deadlock_detection) {
520: if (scheduler == null) {
521: log
522: .error("deadlock_detection is true, but scheduler is null: this is not supposed to happen"
523: + " (discarding request)");
524: break;
525: }
526:
527: Request req = new Request(msg);
528: java.util.Stack stack = hdr.callStack;
529: if (hdr.rsp_expected && stack != null
530: && local_addr != null) {
531: if (stack.contains(local_addr)) {
532: if (log.isTraceEnabled())
533: log
534: .trace("call stack="
535: + hdr.callStack
536: + " contains "
537: + local_addr
538: + ": adding request to priority queue");
539: scheduler.addPrio(req);
540: break;
541: }
542: }
543: scheduler.add(req);
544: break;
545: }
546:
547: handleRequest(msg);
548: break;
549:
550: case Header.RSP:
551: msg.removeHeader(name);
552: RspCollector coll = findEntry(hdr.id);
553: if (coll != null) {
554: Address sender = msg.getSrc();
555: Object retval = null;
556: byte[] buf = msg.getBuffer();
557: try {
558: retval = marshaller != null ? marshaller
559: .objectFromByteBuffer(buf) : Util
560: .objectFromByteBuffer(buf);
561: } catch (Exception e) {
562: log
563: .error(
564: "failed unmarshalling buffer into return value",
565: e);
566: retval = e;
567: }
568: coll.receiveResponse(retval, sender);
569: }
570: break;
571:
572: default:
573: msg.removeHeader(name);
574: if (log.isErrorEnabled())
575: log.error("header's type is neither REQ nor RSP !");
576: break;
577: }
578:
579: return false;
580: }
581:
582: public Address getLocalAddress() {
583: return local_addr;
584: }
585:
586: public void setLocalAddress(Address local_addr) {
587: this .local_addr = local_addr;
588: }
589:
590: // .......................................................................
591:
592: /**
593: * Add an association of:<br>
594: * ID -> <tt>RspCollector</tt>
595: */
596: private void addEntry(long id, RequestEntry entry) {
597: Long id_obj = new Long(id);
598: synchronized (requests) {
599: if (!requests.containsKey(id_obj))
600: requests.put(id_obj, entry);
601: else if (log.isWarnEnabled())
602: log.warn("entry " + entry + " for request-id=" + id
603: + " already present !");
604: }
605: }
606:
607: /**
608: * Remove the request entry associated with the given ID
609: *
610: * @param id the id of the <tt>RequestEntry</tt> to remove
611: */
612: private void removeEntry(long id) {
613: Long id_obj = new Long(id);
614:
615: // changed by bela Feb 28 2003 (bug fix for 690606)
616: // changed back to use synchronization by bela June 27 2003 (bug fix for #761804),
617: // we can do this because we now copy for iteration (viewChange() and suspect())
618: requests.remove(id_obj);
619: }
620:
621: /**
622: * @param id the ID of the corresponding <tt>RspCollector</tt>
623: *
624: * @return the <tt>RspCollector</tt> associated with the given ID
625: */
626: private RspCollector findEntry(long id) {
627: Long id_obj = new Long(id);
628: RequestEntry entry;
629:
630: entry = (RequestEntry) requests.get(id_obj);
631: return ((entry != null) ? entry.coll : null);
632: }
633:
634: /**
635: * Handle a request msg for this correlator
636: *
637: * @param req the request msg
638: */
639: private void handleRequest(Message req) {
640: Object retval;
641: byte[] rsp_buf;
642: Header hdr, rsp_hdr;
643: Message rsp;
644:
645: // i. Remove the request correlator header from the msg and pass it to
646: // the registered handler
647: //
648: // ii. If a reply is expected, pack the return value from the request
649: // handler to a reply msg and send it back. The reply msg has the same
650: // ID as the request and the name of the sender request correlator
651: hdr = (Header) req.removeHeader(name);
652:
653: if (log.isTraceEnabled()) {
654: log.trace(new StringBuffer("calling (").append(
655: (request_handler != null ? request_handler
656: .getClass().getName() : "null")).append(
657: ") with request ").append(hdr.id));
658: }
659:
660: try {
661: retval = request_handler.handle(req);
662: } catch (Throwable t) {
663: if (log.isErrorEnabled())
664: log.error("error invoking method", t);
665: retval = t;
666: }
667:
668: if (!hdr.rsp_expected) // asynchronous call, we don't need to send a response; terminate call here
669: return;
670:
671: if (transport == null) {
672: if (log.isErrorEnabled())
673: log
674: .error("failure sending response; no transport available");
675: return;
676: }
677:
678: // changed (bela Feb 20 2004): catch exception and return exception
679: try { // retval could be an exception, or a real value
680: rsp_buf = marshaller != null ? marshaller
681: .objectToByteBuffer(retval) : Util
682: .objectToByteBuffer(retval);
683: } catch (Throwable t) {
684: try { // this call should succeed (all exceptions are serializable)
685: rsp_buf = marshaller != null ? marshaller
686: .objectToByteBuffer(t) : Util
687: .objectToByteBuffer(t);
688: } catch (Throwable tt) {
689: if (log.isErrorEnabled())
690: log.error("failed sending rsp: return value ("
691: + retval + ") is not serializable");
692: return;
693: }
694: }
695:
696: rsp = req.makeReply();
697: if (rsp_buf != null)
698: rsp.setBuffer(rsp_buf);
699: rsp_hdr = new Header(Header.RSP, hdr.id, false, name);
700: rsp.putHeader(name, rsp_hdr);
701: if (log.isTraceEnabled())
702: log.trace(new StringBuffer("sending rsp for ").append(
703: rsp_hdr.id).append(" to ").append(rsp.getDest()));
704:
705: try {
706: if (transport instanceof Protocol)
707: ((Protocol) transport).passDown(new Event(Event.MSG,
708: rsp));
709: else if (transport instanceof Transport)
710: ((Transport) transport).send(rsp);
711: else if (log.isErrorEnabled())
712: log.error("transport object has to be either a "
713: + "Transport or a Protocol, however it is a "
714: + transport.getClass());
715: } catch (Throwable e) {
716: if (log.isErrorEnabled())
717: log.error("failed sending the response", e);
718: }
719: }
720:
721: // .......................................................................
722:
723: /**
724: * Associates an ID with an <tt>RspCollector</tt>
725: */
726: private static class RequestEntry {
727: public RspCollector coll;
728:
729: public RequestEntry(RspCollector coll) {
730: this .coll = coll;
731: }
732: }
733:
734: /**
735: * The header for <tt>RequestCorrelator</tt> messages
736: */
737: public static final class Header extends org.jgroups.Header
738: implements Streamable {
739: public static final byte REQ = 0;
740: public static final byte RSP = 1;
741:
742: /** Type of header: request or reply */
743: public byte type = REQ;
744: /**
745: * The id of this request to distinguish among other requests from
746: * the same <tt>RequestCorrelator</tt> */
747: public long id = 0;
748:
749: /** msg is synchronous if true */
750: public boolean rsp_expected = true;
751:
752: /** The unique name of the associated <tt>RequestCorrelator</tt> */
753: public String corrName = null;
754:
755: /** Stack<Address>. Contains senders (e.g. P --> Q --> R) */
756: public java.util.Stack callStack = null;
757:
758: /** Contains a list of members who should receive the request (others will drop). Ignored if null */
759: public java.util.List dest_mbrs = null;
760:
761: /**
762: * Used for externalization
763: */
764: public Header() {
765: }
766:
767: /**
768: * @param type type of header (<tt>REQ</tt>/<tt>RSP</tt>)
769: * @param id id of this header relative to ids of other requests
770: * originating from the same correlator
771: * @param rsp_expected whether it's a sync or async request
772: * @param name the name of the <tt>RequestCorrelator</tt> from which
773: */
774: public Header(byte type, long id, boolean rsp_expected,
775: String name) {
776: this .type = type;
777: this .id = id;
778: this .rsp_expected = rsp_expected;
779: this .corrName = name;
780: }
781:
782: /**
783: */
784: public String toString() {
785: StringBuffer ret = new StringBuffer();
786: ret.append("[Header: name=" + corrName + ", type=");
787: ret.append(type == REQ ? "REQ" : type == RSP ? "RSP"
788: : "<unknown>");
789: ret.append(", id=" + id);
790: ret.append(", rsp_expected=" + rsp_expected + ']');
791: if (callStack != null)
792: ret.append(", call stack=" + callStack);
793: if (dest_mbrs != null)
794: ret.append(", dest_mbrs=").append(dest_mbrs);
795: return ret.toString();
796: }
797:
798: public void writeExternal(ObjectOutput out) throws IOException {
799: out.writeByte(type);
800: out.writeLong(id);
801: out.writeBoolean(rsp_expected);
802: if (corrName != null) {
803: out.writeBoolean(true);
804: out.writeUTF(corrName);
805: } else {
806: out.writeBoolean(false);
807: }
808: out.writeObject(callStack);
809: out.writeObject(dest_mbrs);
810: }
811:
812: public void readExternal(ObjectInput in) throws IOException,
813: ClassNotFoundException {
814: type = in.readByte();
815: id = in.readLong();
816: rsp_expected = in.readBoolean();
817: if (in.readBoolean())
818: corrName = in.readUTF();
819: callStack = (java.util.Stack) in.readObject();
820: dest_mbrs = (java.util.List) in.readObject();
821: }
822:
823: public void writeTo(DataOutputStream out) throws IOException {
824: out.writeByte(type);
825: out.writeLong(id);
826: out.writeBoolean(rsp_expected);
827:
828: if (corrName != null) {
829: out.writeBoolean(true);
830: out.writeUTF(corrName);
831: } else {
832: out.writeBoolean(false);
833: }
834:
835: if (callStack != null) {
836: out.writeBoolean(true);
837: out.writeShort(callStack.size());
838: Address mbr;
839: for (int i = 0; i < callStack.size(); i++) {
840: mbr = (Address) callStack.elementAt(i);
841: Util.writeAddress(mbr, out);
842: }
843: } else {
844: out.writeBoolean(false);
845: }
846:
847: Util.writeAddresses(dest_mbrs, out);
848: }
849:
850: public void readFrom(DataInputStream in) throws IOException,
851: IllegalAccessException, InstantiationException {
852: boolean present;
853: type = in.readByte();
854: id = in.readLong();
855: rsp_expected = in.readBoolean();
856:
857: present = in.readBoolean();
858: if (present)
859: corrName = in.readUTF();
860:
861: present = in.readBoolean();
862: if (present) {
863: callStack = new Stack();
864: short len = in.readShort();
865: Address tmp;
866: for (short i = 0; i < len; i++) {
867: tmp = Util.readAddress(in);
868: callStack.add(tmp);
869: }
870: }
871:
872: dest_mbrs = (List) Util.readAddresses(in,
873: java.util.LinkedList.class);
874: }
875:
876: public long size() {
877: long retval = Global.BYTE_SIZE // type
878: + Global.LONG_SIZE // id
879: + Global.BYTE_SIZE; // rsp_expected
880:
881: retval += Global.BYTE_SIZE; // presence for corrName
882: if (corrName != null)
883: retval += corrName.length() + 2; // UTF
884:
885: retval += Global.BYTE_SIZE; // presence
886: if (callStack != null) {
887: retval += Global.SHORT_SIZE; // number of elements
888: if (callStack.size() > 0) {
889: Address mbr = (Address) callStack.firstElement();
890: retval += callStack.size() * (Util.size(mbr));
891: }
892: }
893:
894: retval += Util.size(dest_mbrs);
895: return retval;
896: }
897:
898: }
899:
900: /**
901: * Listens for scheduler events and sets the current call chain (stack)
902: * whenever a thread is started, or a suspended thread resumed. Does
903: * this only for synchronous requests (<code>Runnable</code> is actually
904: * a <code>Request</code>).
905: */
906: private class CallStackSetter implements SchedulerListener {
907: public void started(ReusableThread rt, Runnable r) {
908: setCallStack(rt, r);
909: }
910:
911: public void stopped(ReusableThread rt, Runnable r) {
912: }
913:
914: public void suspended(ReusableThread rt, Runnable r) {
915: }
916:
917: public void resumed(ReusableThread rt, Runnable r) {
918: setCallStack(rt, r);
919: }
920:
921: void setCallStack(ReusableThread rt, Runnable r) {
922: Message req;
923: Header hdr;
924: Object obj;
925:
926: req = ((Request) r).req;
927: if (req == null)
928: return;
929:
930: obj = req.getHeader(name);
931: if (obj == null || !(obj instanceof Header))
932: return;
933:
934: hdr = (Header) obj;
935: if (hdr.rsp_expected == false)
936: return;
937:
938: final java.util.Stack new_stack = (java.util.Stack) hdr.callStack
939: .clone();
940: if (new_stack != null)
941: rt.assignThreadLocalListener(new ThreadLocalListener() {
942: public void setThreadLocal() {
943: call_stack.set(new_stack);
944: }
945:
946: public void resetThreadLocal() {
947: call_stack.set(null);
948: }
949: });
950: }
951: }
952:
953: /**
954: * The runnable for an incoming request which is submitted to the
955: * dispatcher
956: */
957: private class Request implements Runnable {
958: public final Message req;
959:
960: public Request(Message req) {
961: this .req = req;
962: }
963:
964: public void run() {
965: handleRequest(req);
966: }
967:
968: public String toString() {
969: StringBuffer sb = new StringBuffer();
970: if (req != null)
971: sb.append("req=" + req + ", headers="
972: + req.printObjectHeaders());
973: return sb.toString();
974: }
975: }
976:
977: }
|