001: package org.jgroups.blocks;
002:
003: import org.apache.commons.logging.Log;
004: import org.apache.commons.logging.LogFactory;
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.stack.StateTransferInfo;
008: import org.jgroups.util.*;
009:
010: import java.io.InputStream;
011: import java.io.OutputStream;
012: import java.io.Serializable;
013: import java.util.Vector;
014: import java.util.Collection;
015: import java.util.TreeSet;
016: import java.util.ArrayList;
017:
018: /**
019: * Provides synchronous and asynchronous message sending with request-response
020: * correlation; i.e., matching responses with the original request.
021: * It also offers push-style message reception (by internally using the PullPushAdapter).
022: * <p>
023: * Channels are simple patterns to asynchronously send a receive messages.
024: * However, a significant number of communication patterns in group communication
025: * require synchronous communication. For example, a sender would like to send a
026: * message to the group and wait for all responses. Or another application would
027: * like to send a message to the group and wait only until the majority of the
028: * receivers have sent a response, or until a timeout occurred. MessageDispatcher
029: * offers a combination of the above pattern with other patterns.
030: * <p>
031: * Used on top of channel to implement group requests. Client's <code>handle()</code>
032: * method is called when request is received. Is the equivalent of RpcProtocol on
033: * the application instead of protocol level.
034: *
035: * @author Bela Ban
036: * @version $Id: MessageDispatcher.java,v 1.60.2.3 2007/03/08 10:14:45 belaban Exp $
037: */
038: public class MessageDispatcher implements RequestHandler {
039: protected Channel channel = null;
040: protected RequestCorrelator corr = null;
041: protected MessageListener msg_listener = null;
042: protected MembershipListener membership_listener = null;
043: protected RequestHandler req_handler = null;
044: protected ProtocolAdapter prot_adapter = null;
045: protected TransportAdapter transport_adapter = null;
046: protected final Collection members = new TreeSet();
047: protected Address local_addr = null;
048: protected boolean deadlock_detection = false;
049: protected PullPushAdapter adapter = null;
050: protected PullPushHandler handler = null;
051: protected Serializable id = null;
052: protected final Log log = LogFactory.getLog(getClass());
053:
054: /**
055: * Process items on the queue concurrently (RequestCorrelator). The default is to wait until the processing of an
056: * item has completed before fetching the next item from the queue. Note that setting this to true may destroy the
057: * properties of a protocol stack, e.g total or causal order may not be guaranteed. Set this to true only if you
058: * know what you're doing !
059: */
060: protected boolean concurrent_processing = false;
061:
062: public MessageDispatcher(Channel channel, MessageListener l,
063: MembershipListener l2) {
064: this .channel = channel;
065: prot_adapter = new ProtocolAdapter();
066: if (channel != null) {
067: local_addr = channel.getLocalAddress();
068: }
069: setMessageListener(l);
070: setMembershipListener(l2);
071: if (channel != null) {
072: channel.setUpHandler(prot_adapter);
073: }
074: start();
075: }
076:
077: public MessageDispatcher(Channel channel, MessageListener l,
078: MembershipListener l2, boolean deadlock_detection) {
079: this .channel = channel;
080: this .deadlock_detection = deadlock_detection;
081: prot_adapter = new ProtocolAdapter();
082: if (channel != null) {
083: local_addr = channel.getLocalAddress();
084: }
085: setMessageListener(l);
086: setMembershipListener(l2);
087: if (channel != null) {
088: channel.setUpHandler(prot_adapter);
089: }
090: start();
091: }
092:
093: public MessageDispatcher(Channel channel, MessageListener l,
094: MembershipListener l2, boolean deadlock_detection,
095: boolean concurrent_processing) {
096: this .channel = channel;
097: this .deadlock_detection = deadlock_detection;
098: this .concurrent_processing = concurrent_processing;
099: prot_adapter = new ProtocolAdapter();
100: if (channel != null) {
101: local_addr = channel.getLocalAddress();
102: }
103: setMessageListener(l);
104: setMembershipListener(l2);
105: if (channel != null) {
106: channel.setUpHandler(prot_adapter);
107: }
108: start();
109: }
110:
111: public MessageDispatcher(Channel channel, MessageListener l,
112: MembershipListener l2, RequestHandler req_handler) {
113: this (channel, l, l2);
114: setRequestHandler(req_handler);
115: }
116:
117: public MessageDispatcher(Channel channel, MessageListener l,
118: MembershipListener l2, RequestHandler req_handler,
119: boolean deadlock_detection) {
120: this (channel, l, l2, deadlock_detection, false);
121: setRequestHandler(req_handler);
122: }
123:
124: public MessageDispatcher(Channel channel, MessageListener l,
125: MembershipListener l2, RequestHandler req_handler,
126: boolean deadlock_detection, boolean concurrent_processing) {
127: this (channel, l, l2, deadlock_detection, concurrent_processing);
128: setRequestHandler(req_handler);
129: }
130:
131: /*
132: * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
133: * used to register under that id. This is typically used when another building block is already using
134: * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
135: * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
136: * first block created on PullPushAdapter.
137: * @param adapter The PullPushAdapter which to use as underlying transport
138: * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
139: * requests/responses for different building blocks on top of PullPushAdapter.
140: */
141: public MessageDispatcher(PullPushAdapter adapter, Serializable id,
142: MessageListener l, MembershipListener l2) {
143: this .adapter = adapter;
144: this .id = id;
145: setMembers(((Channel) adapter.getTransport()).getView()
146: .getMembers());
147: setMessageListener(l);
148: setMembershipListener(l2);
149: handler = new PullPushHandler();
150: transport_adapter = new TransportAdapter();
151: adapter.addMembershipListener(handler); // remove in stop()
152: if (id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter
153: adapter.setListener(handler);
154: } else {
155: adapter.registerListener(id, handler);
156: }
157:
158: Transport tp;
159: if ((tp = adapter.getTransport()) instanceof Channel) {
160: local_addr = ((Channel) tp).getLocalAddress();
161: }
162: start();
163: }
164:
165: /*
166: * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
167: * used to register under that id. This is typically used when another building block is already using
168: * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
169: * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
170: * first block created on PullPushAdapter.
171: * @param adapter The PullPushAdapter which to use as underlying transport
172: * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
173: * requests/responses for different building blocks on top of PullPushAdapter.
174: * @param req_handler The object implementing RequestHandler. It will be called when a request is received
175: */
176: public MessageDispatcher(PullPushAdapter adapter, Serializable id,
177: MessageListener l, MembershipListener l2,
178: RequestHandler req_handler) {
179: this .adapter = adapter;
180: this .id = id;
181: setMembers(((Channel) adapter.getTransport()).getView()
182: .getMembers());
183: setRequestHandler(req_handler);
184: setMessageListener(l);
185: setMembershipListener(l2);
186: handler = new PullPushHandler();
187: transport_adapter = new TransportAdapter();
188: adapter.addMembershipListener(handler);
189: if (id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter
190: adapter.setListener(handler);
191: } else {
192: adapter.registerListener(id, handler);
193: }
194:
195: Transport tp;
196: if ((tp = adapter.getTransport()) instanceof Channel) {
197: local_addr = ((Channel) tp).getLocalAddress(); // fixed bug #800774
198: }
199:
200: start();
201: }
202:
203: public MessageDispatcher(PullPushAdapter adapter, Serializable id,
204: MessageListener l, MembershipListener l2,
205: RequestHandler req_handler, boolean concurrent_processing) {
206: this .concurrent_processing = concurrent_processing;
207: this .adapter = adapter;
208: this .id = id;
209: setMembers(((Channel) adapter.getTransport()).getView()
210: .getMembers());
211: setRequestHandler(req_handler);
212: setMessageListener(l);
213: setMembershipListener(l2);
214: handler = new PullPushHandler();
215: transport_adapter = new TransportAdapter();
216: adapter.addMembershipListener(handler);
217: if (id == null) { // no other building block around, let's become the main consumer of this PullPushAdapter
218: adapter.setListener(handler);
219: } else {
220: adapter.registerListener(id, handler);
221: }
222:
223: Transport tp;
224: if ((tp = adapter.getTransport()) instanceof Channel) {
225: local_addr = ((Channel) tp).getLocalAddress(); // fixed bug #800774
226: }
227:
228: start();
229: }
230:
231: /** Returns a copy of members */
232: protected Collection getMembers() {
233: synchronized (members) {
234: return new ArrayList(members);
235: }
236: }
237:
238: /**
239: * If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter
240: * initially since viewChange has most likely already been called in PullPushAdapter.
241: */
242: private void setMembers(Vector new_mbrs) {
243: if (new_mbrs != null) {
244: synchronized (members) {
245: members.clear();
246: members.addAll(new_mbrs);
247: }
248: }
249: }
250:
251: public void setDeadlockDetection(boolean flag) {
252: deadlock_detection = flag;
253: if (corr != null)
254: corr.setDeadlockDetection(flag);
255: }
256:
257: public void setConcurrentProcessing(boolean flag) {
258: this .concurrent_processing = flag;
259: if (corr != null)
260: corr.setConcurrentProcessing(flag);
261: }
262:
263: public final void start() {
264: if (corr == null) {
265: if (transport_adapter != null) {
266: corr = new RequestCorrelator("MessageDispatcher",
267: transport_adapter, this , deadlock_detection,
268: local_addr, concurrent_processing);
269: } else {
270: corr = new RequestCorrelator("MessageDispatcher",
271: prot_adapter, this , deadlock_detection,
272: local_addr, concurrent_processing);
273: }
274: }
275: correlatorStarted();
276: corr.start();
277: if (channel != null) {
278: Vector tmp_mbrs = channel.getView() != null ? channel
279: .getView().getMembers() : null;
280: setMembers(tmp_mbrs);
281: }
282: }
283:
284: protected void correlatorStarted() {
285: ;
286: }
287:
288: public void stop() {
289: if (corr != null) {
290: corr.stop();
291: }
292:
293: // fixes leaks of MembershipListeners (http://jira.jboss.com/jira/browse/JGRP-160)
294: if (adapter != null && handler != null) {
295: adapter.removeMembershipListener(handler);
296: }
297: }
298:
299: public final void setMessageListener(MessageListener l) {
300: msg_listener = l;
301: }
302:
303: /**
304: * Gives access to the currently configured MessageListener. Returns null if there is no
305: * configured MessageListener.
306: */
307: public MessageListener getMessageListener() {
308: return msg_listener;
309: }
310:
311: public final void setMembershipListener(MembershipListener l) {
312: membership_listener = l;
313: }
314:
315: public final void setRequestHandler(RequestHandler rh) {
316: req_handler = rh;
317: }
318:
319: /**
320: * Offers access to the underlying Channel.
321: * @return a reference to the underlying Channel.
322: */
323: public Channel getChannel() {
324: return channel;
325: }
326:
327: public void send(Message msg) throws ChannelNotConnectedException,
328: ChannelClosedException {
329: if (channel != null) {
330: channel.send(msg);
331: } else if (adapter != null) {
332: try {
333: if (id != null) {
334: adapter.send(id, msg);
335: } else {
336: adapter.send(msg);
337: }
338: } catch (Throwable ex) {
339: if (log.isErrorEnabled()) {
340: log.error("exception=" + Util.print(ex));
341: }
342: }
343: } else {
344: if (log.isErrorEnabled()) {
345: log.error("channel == null");
346: }
347: }
348: }
349:
350: public RspList castMessage(final Vector dests, Message msg,
351: int mode, long timeout) {
352: return castMessage(dests, msg, mode, timeout, false);
353: }
354:
355: /**
356: * Cast a message to all members, and wait for <code>mode</code> responses. The responses are returned in a response
357: * list, where each response is associated with its sender.<p> Uses <code>GroupRequest</code>.
358: *
359: * @param dests The members to which the message is to be sent. If it is null, then the message is sent to all
360: * members
361: * @param msg The message to be sent to n members
362: * @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for: <ol> <li>GET_FIRST:
363: * return the first response received. <li>GET_ALL: wait for all responses (minus the ones from
364: * suspected members) <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp
365: * size) <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once) <li>GET_N: wait for n
366: * responses (may block if n > group size) <li>GET_NONE: wait for no responses, return immediately
367: * (non-blocking) </ol>
368: * @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses <em>or</em> timeout time.
369: * @return RspList A list of responses. Each response is an <code>Object</code> and associated to its sender.
370: */
371: public RspList castMessage(final Vector dests, Message msg,
372: int mode, long timeout, boolean use_anycasting) {
373: GroupRequest _req = null;
374: Vector real_dests;
375: Channel tmp;
376:
377: // we need to clone because we don't want to modify the original
378: // (we remove ourselves if LOCAL is false, see below) !
379: // real_dests=dests != null ? (Vector) dests.clone() : (members != null ? new Vector(members) : null);
380: if (dests != null) {
381: real_dests = (Vector) dests.clone();
382: } else {
383: synchronized (members) {
384: real_dests = new Vector(members);
385: }
386: }
387:
388: // if local delivery is off, then we should not wait for the message from the local member.
389: // therefore remove it from the membership
390: tmp = channel;
391: if (tmp == null) {
392: if (adapter != null
393: && adapter.getTransport() instanceof Channel) {
394: tmp = (Channel) adapter.getTransport();
395: }
396: }
397:
398: if (tmp != null
399: && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
400: if (local_addr == null) {
401: local_addr = tmp.getLocalAddress();
402: }
403: if (local_addr != null && real_dests != null) {
404: real_dests.removeElement(local_addr);
405: }
406: }
407:
408: // don't even send the message if the destination list is empty
409: if (log.isTraceEnabled())
410: log.trace("real_dests=" + real_dests);
411:
412: if (real_dests == null || real_dests.size() == 0) {
413: if (log.isTraceEnabled())
414: log
415: .trace("destination list is empty, won't send message");
416: return new RspList(); // return empty response list
417: }
418:
419: _req = new GroupRequest(msg, corr, real_dests, mode, timeout, 0);
420: _req.setCaller(this .local_addr);
421: try {
422: _req.execute(use_anycasting);
423: } catch (Exception ex) {
424: throw new RuntimeException("failed executing request "
425: + _req, ex);
426: }
427:
428: return _req.getResults();
429: }
430:
431: /**
432: * Multicast a message request to all members in <code>dests</code> and receive responses via the RspCollector
433: * interface. When done receiving the required number of responses, the caller has to call done(req_id) on the
434: * underlyinh RequestCorrelator, so that the resources allocated to that request can be freed.
435: *
436: * @param dests The list of members from which to receive responses. Null means all members
437: * @param req_id The ID of the request. Used by the underlying RequestCorrelator to correlate responses with
438: * requests
439: * @param msg The request to be sent
440: * @param coll The sender needs to provide this interface to collect responses. Call will return immediately if
441: * this is null
442: */
443: public void castMessage(final Vector dests, long req_id,
444: Message msg, RspCollector coll) {
445: Vector real_dests;
446: Channel tmp;
447:
448: if (msg == null) {
449: if (log.isErrorEnabled())
450: log.error("request is null");
451: return;
452: }
453:
454: if (coll == null) {
455: if (log.isErrorEnabled())
456: log
457: .error("response collector is null (must be non-null)");
458: return;
459: }
460:
461: // we need to clone because we don't want to modify the original
462: // (we remove ourselves if LOCAL is false, see below) !
463: //real_dests=dests != null ? (Vector) dests.clone() : (Vector) members.clone();
464: if (dests != null) {
465: real_dests = (Vector) dests.clone();
466: } else {
467: synchronized (members) {
468: real_dests = new Vector(members);
469: }
470: }
471:
472: // if local delivery is off, then we should not wait for the message from the local member.
473: // therefore remove it from the membership
474: tmp = channel;
475: if (tmp == null) {
476: if (adapter != null
477: && adapter.getTransport() instanceof Channel) {
478: tmp = (Channel) adapter.getTransport();
479: }
480: }
481:
482: if (tmp != null
483: && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
484: if (local_addr == null) {
485: local_addr = tmp.getLocalAddress();
486: }
487: if (local_addr != null) {
488: real_dests.removeElement(local_addr);
489: }
490: }
491:
492: // don't even send the message if the destination list is empty
493: if (real_dests.size() == 0) {
494: if (log.isDebugEnabled())
495: log
496: .debug("destination list is empty, won't send message");
497: return;
498: }
499:
500: try {
501: corr.sendRequest(req_id, real_dests, msg, coll);
502: } catch (Exception e) {
503: throw new RuntimeException("failure sending request "
504: + req_id + " to " + real_dests, e);
505: }
506: }
507:
508: public void done(long req_id) {
509: corr.done(req_id);
510: }
511:
512: /**
513: * Sends a message to a single member (destination = msg.dest) and returns the response. The message's destination
514: * must be non-zero !
515: */
516: public Object sendMessage(Message msg, int mode, long timeout)
517: throws TimeoutException, SuspectedException {
518: Vector mbrs = new Vector();
519: RspList rsp_list = null;
520: Object dest = msg.getDest();
521: Rsp rsp;
522: GroupRequest _req = null;
523:
524: if (dest == null) {
525: if (log.isErrorEnabled())
526: log
527: .error("the message's destination is null, cannot send message");
528: return null;
529: }
530:
531: mbrs.addElement(dest); // dummy membership (of destination address)
532:
533: _req = new GroupRequest(msg, corr, mbrs, mode, timeout, 0);
534: _req.setCaller(local_addr);
535: try {
536: _req.execute();
537: } catch (Exception t) {
538: throw new RuntimeException("failed executing request "
539: + _req, t);
540: }
541:
542: if (mode == GroupRequest.GET_NONE) {
543: return null;
544: }
545:
546: rsp_list = _req.getResults();
547:
548: if (rsp_list.size() == 0) {
549: if (log.isWarnEnabled())
550: log.warn(" response list is empty");
551: return null;
552: }
553: if (rsp_list.size() > 1) {
554: if (log.isWarnEnabled())
555: log
556: .warn("response list contains more that 1 response; returning first response !");
557: }
558: rsp = (Rsp) rsp_list.elementAt(0);
559: if (rsp.wasSuspected()) {
560: throw new SuspectedException(dest);
561: }
562: if (!rsp.wasReceived()) {
563: throw new TimeoutException();
564: }
565: return rsp.getValue();
566: }
567:
568: // public void channelConnected(Channel channel) {
569: // if(channel != null) {
570: // Address new_local_addr=channel.getLocalAddress();
571: // if(new_local_addr != null) {
572: // this.local_addr=new_local_addr;
573: //
574: // if(log.isInfoEnabled()) log.info("MessageDispatcher.channelConnected()", "new local address is " + this.local_addr);
575: // }
576: // }
577: // }
578: //
579: // public void channelDisconnected(Channel channel) {
580: // }
581: //
582: // public void channelClosed(Channel channel) {
583: // }
584: //
585: // public void channelShunned() {
586: // }
587: //
588: // public void channelReconnected(Address addr) {
589: // if(channel != null) {
590: // Address new_local_addr=channel.getLocalAddress();
591: // if(new_local_addr != null) {
592: // this.local_addr=new_local_addr;
593: //
594: // if(log.isInfoEnabled()) log.info("MessageDispatcher.channelReconnected()", "new local address is " + this.local_addr);
595: // }
596: // }
597: // }
598:
599: /* ------------------------ RequestHandler Interface ---------------------- */
600: public Object handle(Message msg) {
601: if (req_handler != null) {
602: return req_handler.handle(msg);
603: } else {
604: return null;
605: }
606: }
607:
608: /* -------------------- End of RequestHandler Interface ------------------- */
609:
610: class ProtocolAdapter extends Protocol implements UpHandler {
611:
612: /* ------------------------- Protocol Interface --------------------------- */
613:
614: public String getName() {
615: return "MessageDispatcher";
616: }
617:
618: public void startUpHandler() {
619: // do nothing, DON'T REMOVE !!!!
620: }
621:
622: public void startDownHandler() {
623: // do nothing, DON'T REMOVE !!!!
624: }
625:
626: public void stopInternal() {
627: // do nothing, DON'T REMOVE !!!!
628: }
629:
630: protected void receiveUpEvent(Event evt) {
631: }
632:
633: protected void receiveDownEvent(Event evt) {
634: }
635:
636: /**
637: * Called by request correlator when message was not generated by it. We handle it and call the message
638: * listener's corresponding methods
639: */
640: public void passUp(Event evt) {
641: switch (evt.getType()) {
642: case Event.MSG:
643: if (msg_listener != null) {
644: msg_listener.receive((Message) evt.getArg());
645: }
646: break;
647:
648: case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
649: StateTransferInfo info = (StateTransferInfo) evt
650: .getArg();
651: String state_id = info.state_id;
652: byte[] tmp_state = null;
653: if (msg_listener != null) {
654: try {
655: if (msg_listener instanceof ExtendedMessageListener
656: && state_id != null) {
657: tmp_state = ((ExtendedMessageListener) msg_listener)
658: .getState(state_id);
659: } else {
660: tmp_state = msg_listener.getState();
661: }
662: } catch (Throwable t) {
663: this .log.error(
664: "failed getting state from message listener ("
665: + msg_listener + ')', t);
666: }
667: }
668: channel.returnState(tmp_state, state_id);
669: break;
670:
671: case Event.GET_STATE_OK:
672: if (msg_listener != null) {
673: try {
674: info = (StateTransferInfo) evt.getArg();
675: String id = info.state_id;
676: if (msg_listener instanceof ExtendedMessageListener
677: && id != null) {
678: ((ExtendedMessageListener) msg_listener)
679: .setState(id, info.state);
680: } else {
681: msg_listener.setState(info.state);
682: }
683: } catch (ClassCastException cast_ex) {
684: if (this .log.isErrorEnabled())
685: this .log
686: .error("received SetStateEvent, but argument "
687: + evt.getArg()
688: + " is not serializable. Discarding message.");
689: }
690: }
691: break;
692:
693: case Event.STATE_TRANSFER_OUTPUTSTREAM:
694: if (msg_listener != null) {
695: StateTransferInfo sti = (StateTransferInfo) evt
696: .getArg();
697: OutputStream os = sti.outputStream;
698: if (os != null
699: && msg_listener instanceof ExtendedMessageListener) {
700: if (sti.state_id == null)
701: ((ExtendedMessageListener) msg_listener)
702: .getState(os);
703: else
704: ((ExtendedMessageListener) msg_listener)
705: .getState(sti.state_id, os);
706: }
707: return;
708: }
709: break;
710:
711: case Event.STATE_TRANSFER_INPUTSTREAM:
712: if (msg_listener != null) {
713: StateTransferInfo sti = (StateTransferInfo) evt
714: .getArg();
715: InputStream is = sti.inputStream;
716: if (is != null
717: && msg_listener instanceof ExtendedMessageListener) {
718: if (sti.state_id == null)
719: ((ExtendedMessageListener) msg_listener)
720: .setState(is);
721: else
722: ((ExtendedMessageListener) msg_listener)
723: .setState(sti.state_id, is);
724: }
725: }
726: break;
727:
728: case Event.VIEW_CHANGE:
729: View v = (View) evt.getArg();
730: Vector new_mbrs = v.getMembers();
731: setMembers(new_mbrs);
732: if (membership_listener != null) {
733: membership_listener.viewAccepted(v);
734: }
735: break;
736:
737: case Event.SET_LOCAL_ADDRESS:
738: if (log.isTraceEnabled())
739: log.trace("setting local_addr (" + local_addr
740: + ") to " + evt.getArg());
741: local_addr = (Address) evt.getArg();
742: break;
743:
744: case Event.SUSPECT:
745: if (membership_listener != null) {
746: membership_listener.suspect((Address) evt.getArg());
747: }
748: break;
749:
750: case Event.BLOCK:
751: if (membership_listener != null) {
752: membership_listener.block();
753: }
754: channel.blockOk();
755: break;
756: case Event.UNBLOCK:
757: if (membership_listener instanceof ExtendedMembershipListener) {
758: ((ExtendedMembershipListener) membership_listener)
759: .unblock();
760: }
761: break;
762: }
763: }
764:
765: public void passDown(Event evt) {
766: down(evt);
767: }
768:
769: /**
770: * Called by channel (we registered before) when event is received. This is the UpHandler interface.
771: */
772: public void up(Event evt) {
773: if (corr != null) {
774: corr.receive(evt); // calls passUp()
775: } else {
776: if (log.isErrorEnabled()) { //Something is seriously wrong, correlator should not be null since latch is not locked!
777: log
778: .error("correlator is null, event will be ignored (evt="
779: + evt + ")");
780: }
781: }
782: }
783:
784: public void down(Event evt) {
785: if (channel != null) {
786: channel.down(evt);
787: } else if (this .log.isWarnEnabled()) {
788: this .log.warn("channel is null, discarding event "
789: + evt);
790: }
791: }
792: /* ----------------------- End of Protocol Interface ------------------------ */
793:
794: }
795:
796: class TransportAdapter implements Transport {
797:
798: public void send(Message msg) throws Exception {
799: if (channel != null) {
800: channel.send(msg);
801: } else if (adapter != null) {
802: try {
803: if (id != null) {
804: adapter.send(id, msg);
805: } else {
806: adapter.send(msg);
807: }
808: } catch (Throwable ex) {
809: if (log.isErrorEnabled()) {
810: log.error("exception=" + Util.print(ex));
811: }
812: }
813: } else {
814: if (log.isErrorEnabled()) {
815: log.error("channel == null");
816: }
817: }
818: }
819:
820: public Object receive(long timeout) throws Exception {
821: return null;
822: }
823: }
824:
825: class PullPushHandler implements ExtendedMessageListener,
826: MembershipListener {
827:
828: /* ------------------------- MessageListener interface ---------------------- */
829: public void receive(Message msg) {
830: boolean pass_up = true;
831: if (corr != null) {
832: pass_up = corr.receiveMessage(msg);
833: }
834:
835: if (pass_up) { // pass on to MessageListener
836: if (msg_listener != null) {
837: msg_listener.receive(msg);
838: }
839: }
840: }
841:
842: public byte[] getState() {
843: return msg_listener != null ? msg_listener.getState()
844: : null;
845: }
846:
847: public byte[] getState(String state_id) {
848: if (msg_listener == null)
849: return null;
850: if (msg_listener instanceof ExtendedMessageListener
851: && state_id != null) {
852: return ((ExtendedMessageListener) msg_listener)
853: .getState(state_id);
854: } else {
855: return msg_listener.getState();
856: }
857: }
858:
859: public void setState(byte[] state) {
860: if (msg_listener != null) {
861: msg_listener.setState(state);
862: }
863: }
864:
865: public void setState(String state_id, byte[] state) {
866: if (msg_listener != null) {
867: if (msg_listener instanceof ExtendedMessageListener
868: && state_id != null) {
869: ((ExtendedMessageListener) msg_listener).setState(
870: state_id, state);
871: } else {
872: msg_listener.setState(state);
873: }
874: }
875: }
876:
877: public void getState(OutputStream ostream) {
878: if (msg_listener instanceof ExtendedMessageListener) {
879: ((ExtendedMessageListener) msg_listener)
880: .getState(ostream);
881: }
882: }
883:
884: public void getState(String state_id, OutputStream ostream) {
885: if (msg_listener instanceof ExtendedMessageListener
886: && state_id != null) {
887: ((ExtendedMessageListener) msg_listener).getState(
888: state_id, ostream);
889: }
890:
891: }
892:
893: public void setState(InputStream istream) {
894: if (msg_listener instanceof ExtendedMessageListener) {
895: ((ExtendedMessageListener) msg_listener)
896: .setState(istream);
897: }
898: }
899:
900: public void setState(String state_id, InputStream istream) {
901: if (msg_listener instanceof ExtendedMessageListener
902: && state_id != null) {
903: ((ExtendedMessageListener) msg_listener).setState(
904: state_id, istream);
905: }
906: }
907:
908: /*
909: * --------------------- End of MessageListener interface
910: * -------------------
911: */
912:
913: /* ------------------------ MembershipListener interface -------------------- */
914: public void viewAccepted(View v) {
915: if (corr != null) {
916: corr.receiveView(v);
917: }
918:
919: Vector new_mbrs = v.getMembers();
920: setMembers(new_mbrs);
921: if (membership_listener != null) {
922: membership_listener.viewAccepted(v);
923: }
924: }
925:
926: public void suspect(Address suspected_mbr) {
927: if (corr != null) {
928: corr.receiveSuspect(suspected_mbr);
929: }
930: if (membership_listener != null) {
931: membership_listener.suspect(suspected_mbr);
932: }
933: }
934:
935: public void block() {
936: if (membership_listener != null) {
937: membership_listener.block();
938: }
939: }
940:
941: /* --------------------- End of MembershipListener interface ---------------- */
942:
943: // @todo: receive SET_LOCAL_ADDR event and call corr.setLocalAddress(addr)
944: }
945:
946: }
|