001: // $Id: RpcDispatcher.java,v 1.26.2.1 2006/12/04 13:52:51 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.jgroups.*;
006: import org.jgroups.util.RspList;
007: import org.jgroups.util.Util;
008:
009: import java.io.Serializable;
010: import java.lang.reflect.Method;
011: import java.util.ArrayList;
012: import java.util.Iterator;
013: import java.util.List;
014: import java.util.Vector;
015:
016: /**
017: * This class allows a programmer to invoke remote methods in all (or single)
018: * group members and optionally wait for the return value(s).
019: * An application will typically create a channel and layer the
020: * RpcDispatcher building block on top of it, which allows it to
021: * dispatch remote methods (client role) and at the same time be
022: * called by other members (server role).
023: * This class is derived from MessageDispatcher.
024: * Is the equivalent of RpcProtocol on the application rather than protocol level.
025: * @author Bela Ban
026: */
027: public class RpcDispatcher extends MessageDispatcher implements
028: ChannelListener {
029: protected Object server_obj = null;
030: /** Marshaller to marshall requests at the caller and unmarshal requests at the receiver(s) */
031: protected Marshaller req_marshaller = null;
032:
033: /** Marshaller to marshal responses at the receiver(s) and unmarshal responses at the caller */
034: protected Marshaller rsp_marshaller = null;
035: protected final List additionalChannelListeners = new ArrayList();
036: protected MethodLookup method_lookup = null;
037:
038: public RpcDispatcher(Channel channel, MessageListener l,
039: MembershipListener l2, Object server_obj) {
040: super (channel, l, l2);
041: channel.addChannelListener(this );
042: this .server_obj = server_obj;
043: }
044:
045: public RpcDispatcher(Channel channel, MessageListener l,
046: MembershipListener l2, Object server_obj,
047: boolean deadlock_detection) {
048: super (channel, l, l2, deadlock_detection);
049: channel.addChannelListener(this );
050: this .server_obj = server_obj;
051: }
052:
053: public RpcDispatcher(Channel channel, MessageListener l,
054: MembershipListener l2, Object server_obj,
055: boolean deadlock_detection, boolean concurrent_processing) {
056: super (channel, l, l2, deadlock_detection, concurrent_processing);
057: channel.addChannelListener(this );
058: this .server_obj = server_obj;
059: }
060:
061: public RpcDispatcher(PullPushAdapter adapter, Serializable id,
062: MessageListener l, MembershipListener l2, Object server_obj) {
063: super (adapter, id, l, l2);
064:
065: // Fixes bug #804956
066: // channel.setChannelListener(this);
067: if (this .adapter != null) {
068: Transport t = this .adapter.getTransport();
069: if (t != null && t instanceof Channel) {
070: ((Channel) t).addChannelListener(this );
071: }
072: }
073:
074: this .server_obj = server_obj;
075: }
076:
077: public interface Marshaller {
078: byte[] objectToByteBuffer(Object obj) throws Exception;
079:
080: Object objectFromByteBuffer(byte[] buf) throws Exception;
081: }
082:
083: public String getName() {
084: return "RpcDispatcher";
085: }
086:
087: public Marshaller getRequestMarshaller() {
088: return req_marshaller;
089: }
090:
091: public void setRequestMarshaller(Marshaller m) {
092: this .req_marshaller = m;
093: }
094:
095: public Marshaller getResponseMarshaller() {
096: return rsp_marshaller;
097: }
098:
099: public void setResponseMarshaller(Marshaller m) {
100: this .rsp_marshaller = m;
101: if (corr != null)
102: corr.setMarshaller(m);
103: }
104:
105: public Marshaller getMarshaller() {
106: return req_marshaller;
107: }
108:
109: public void setMarshaller(Marshaller m) {
110: req_marshaller = m;
111: }
112:
113: public Object getServerObject() {
114: return server_obj;
115: }
116:
117: public void setServerObject(Object server_obj) {
118: this .server_obj = server_obj;
119: }
120:
121: public MethodLookup getMethodLookup() {
122: return method_lookup;
123: }
124:
125: public void setMethodLookup(MethodLookup method_lookup) {
126: this .method_lookup = method_lookup;
127: }
128:
129: public RspList castMessage(Vector dests, Message msg, int mode,
130: long timeout) {
131: if (log.isErrorEnabled())
132: log
133: .error("this method should not be used with "
134: + "RpcDispatcher, but MessageDispatcher. Returning null");
135: return null;
136: }
137:
138: public Object sendMessage(Message msg, int mode, long timeout)
139: throws TimeoutException, SuspectedException {
140: if (log.isErrorEnabled())
141: log
142: .error("this method should not be used with "
143: + "RpcDispatcher, but MessageDispatcher. Returning null");
144: return null;
145: }
146:
147: public RspList callRemoteMethods(Vector dests, String method_name,
148: Object[] args, Class[] types, int mode, long timeout) {
149: return callRemoteMethods(dests, method_name, args, types, mode,
150: timeout, false);
151: }
152:
153: public RspList callRemoteMethods(Vector dests, String method_name,
154: Object[] args, Class[] types, int mode, long timeout,
155: boolean use_anycasting) {
156: MethodCall method_call = new MethodCall(method_name, args,
157: types);
158: return callRemoteMethods(dests, method_call, mode, timeout,
159: use_anycasting);
160: }
161:
162: public RspList callRemoteMethods(Vector dests, String method_name,
163: Object[] args, String[] signature, int mode, long timeout) {
164: return callRemoteMethods(dests, method_name, args, signature,
165: mode, timeout, false);
166: }
167:
168: public RspList callRemoteMethods(Vector dests, String method_name,
169: Object[] args, String[] signature, int mode, long timeout,
170: boolean use_anycasting) {
171: MethodCall method_call = new MethodCall(method_name, args,
172: signature);
173: return callRemoteMethods(dests, method_call, mode, timeout,
174: use_anycasting);
175: }
176:
177: public RspList callRemoteMethods(Vector dests,
178: MethodCall method_call, int mode, long timeout) {
179: return callRemoteMethods(dests, method_call, mode, timeout,
180: false);
181: }
182:
183: public RspList callRemoteMethods(Vector dests,
184: MethodCall method_call, int mode, long timeout,
185: boolean use_anycasting) {
186: if (dests != null && dests.size() == 0) {
187: // don't send if dest list is empty
188: if (log.isTraceEnabled())
189: log
190: .trace(new StringBuffer("destination list of ")
191: .append(method_call.getName())
192: .append(
193: "() is empty: no need to send message"));
194: return new RspList();
195: }
196:
197: if (log.isTraceEnabled())
198: log.trace(new StringBuffer("dests=").append(dests).append(
199: ", method_call=").append(method_call).append(
200: ", mode=").append(mode).append(", timeout=")
201: .append(timeout));
202:
203: byte[] buf;
204: try {
205: buf = req_marshaller != null ? req_marshaller
206: .objectToByteBuffer(method_call) : Util
207: .objectToByteBuffer(method_call);
208: } catch (Exception e) {
209: // if(log.isErrorEnabled()) log.error("exception", e);
210: // we will change this in 2.4 to add the exception to the signature
211: // (see http://jira.jboss.com/jira/browse/JGRP-193). The reason for a RTE is that we cannot change the
212: // signature in 2.3, otherwise 2.3 would be *not* API compatible to prev releases
213: throw new RuntimeException(
214: "failure to marshal argument(s)", e);
215: }
216:
217: Message msg = new Message(null, null, buf);
218: RspList retval = super .castMessage(dests, msg, mode, timeout,
219: use_anycasting);
220: if (log.isTraceEnabled())
221: log.trace("responses: " + retval);
222: return retval;
223: }
224:
225: public Object callRemoteMethod(Address dest, String method_name,
226: Object[] args, Class[] types, int mode, long timeout)
227: throws Throwable {
228: MethodCall method_call = new MethodCall(method_name, args,
229: types);
230: return callRemoteMethod(dest, method_call, mode, timeout);
231: }
232:
233: public Object callRemoteMethod(Address dest, String method_name,
234: Object[] args, String[] signature, int mode, long timeout)
235: throws Throwable {
236: MethodCall method_call = new MethodCall(method_name, args,
237: signature);
238: return callRemoteMethod(dest, method_call, mode, timeout);
239: }
240:
241: public Object callRemoteMethod(Address dest,
242: MethodCall method_call, int mode, long timeout)
243: throws Throwable {
244: byte[] buf = null;
245: Message msg = null;
246: Object retval = null;
247:
248: if (log.isTraceEnabled())
249: log.trace("dest=" + dest + ", method_call=" + method_call
250: + ", mode=" + mode + ", timeout=" + timeout);
251:
252: buf = req_marshaller != null ? req_marshaller
253: .objectToByteBuffer(method_call) : Util
254: .objectToByteBuffer(method_call);
255: msg = new Message(dest, null, buf);
256: retval = super .sendMessage(msg, mode, timeout);
257: if (log.isTraceEnabled())
258: log.trace("retval: " + retval);
259: if (retval instanceof Throwable)
260: throw (Throwable) retval;
261: return retval;
262: }
263:
264: protected void correlatorStarted() {
265: if (corr != null)
266: corr.setMarshaller(rsp_marshaller);
267: }
268:
269: /**
270: * Message contains MethodCall. Execute it against *this* object and return result.
271: * Use MethodCall.invoke() to do this. Return result.
272: */
273: public Object handle(Message req) {
274: Object body = null;
275: MethodCall method_call;
276:
277: if (server_obj == null) {
278: if (log.isErrorEnabled())
279: log
280: .error("no method handler is registered. Discarding request.");
281: return null;
282: }
283:
284: if (req == null || req.getLength() == 0) {
285: if (log.isErrorEnabled())
286: log.error("message or message buffer is null");
287: return null;
288: }
289:
290: try {
291: body = req_marshaller != null ? req_marshaller
292: .objectFromByteBuffer(req.getBuffer()) : req
293: .getObject();
294: } catch (Throwable e) {
295: if (log.isErrorEnabled())
296: log.error("exception marshalling object", e);
297: return e;
298: }
299:
300: if (body == null || !(body instanceof MethodCall)) {
301: if (log.isErrorEnabled())
302: log
303: .error("message does not contain a MethodCall object");
304: return null;
305: }
306:
307: method_call = (MethodCall) body;
308:
309: try {
310: if (log.isTraceEnabled())
311: log.trace("[sender=" + req.getSrc()
312: + "], method_call: " + method_call);
313:
314: if (method_call.getMode() == MethodCall.ID) {
315: if (method_lookup == null)
316: throw new Exception("MethodCall uses ID="
317: + method_call.getId()
318: + ", but method_lookup has not been set");
319: Method m = method_lookup
320: .findMethod(method_call.getId());
321: if (m == null)
322: throw new Exception("no method foudn for "
323: + method_call.getId());
324: method_call.setMethod(m);
325: }
326:
327: return method_call.invoke(server_obj);
328: } catch (Throwable x) {
329: return x;
330: }
331: }
332:
333: /**
334: * Add a new channel listener to be notified on the channel's state change.
335: *
336: * @return true if the listener was added or false if the listener was already in the list.
337: */
338: public boolean addChannelListener(ChannelListener l) {
339:
340: synchronized (additionalChannelListeners) {
341: if (additionalChannelListeners.contains(l)) {
342: return false;
343: }
344: additionalChannelListeners.add(l);
345: return true;
346: }
347: }
348:
349: /**
350: *
351: * @return true if the channel was removed indeed.
352: */
353: public boolean removeChannelListener(ChannelListener l) {
354:
355: synchronized (additionalChannelListeners) {
356: return additionalChannelListeners.remove(l);
357: }
358: }
359:
360: /* --------------------- Interface ChannelListener ---------------------- */
361:
362: public void channelConnected(Channel channel) {
363:
364: synchronized (additionalChannelListeners) {
365: for (Iterator i = additionalChannelListeners.iterator(); i
366: .hasNext();) {
367: ChannelListener l = (ChannelListener) i.next();
368: try {
369: l.channelConnected(channel);
370: } catch (Throwable t) {
371: log.warn("channel listener failed", t);
372: }
373: }
374: }
375: }
376:
377: public void channelDisconnected(Channel channel) {
378:
379: stop();
380:
381: synchronized (additionalChannelListeners) {
382: for (Iterator i = additionalChannelListeners.iterator(); i
383: .hasNext();) {
384: ChannelListener l = (ChannelListener) i.next();
385: try {
386: l.channelDisconnected(channel);
387: } catch (Throwable t) {
388: log.warn("channel listener failed", t);
389: }
390: }
391: }
392: }
393:
394: public void channelClosed(Channel channel) {
395:
396: stop();
397:
398: synchronized (additionalChannelListeners) {
399: for (Iterator i = additionalChannelListeners.iterator(); i
400: .hasNext();) {
401: ChannelListener l = (ChannelListener) i.next();
402: try {
403: l.channelClosed(channel);
404: } catch (Throwable t) {
405: log.warn("channel listener failed", t);
406: }
407: }
408: }
409: }
410:
411: public void channelShunned() {
412:
413: synchronized (additionalChannelListeners) {
414: for (Iterator i = additionalChannelListeners.iterator(); i
415: .hasNext();) {
416: ChannelListener l = (ChannelListener) i.next();
417: try {
418: l.channelShunned();
419: } catch (Throwable t) {
420: log.warn("channel listener failed", t);
421: }
422: }
423: }
424: }
425:
426: public void channelReconnected(Address new_addr) {
427: if (log.isTraceEnabled())
428: log.trace("channel has been rejoined, old local_addr="
429: + local_addr + ", new local_addr=" + new_addr);
430: this .local_addr = new_addr;
431: start();
432:
433: synchronized (additionalChannelListeners) {
434: for (Iterator i = additionalChannelListeners.iterator(); i
435: .hasNext();) {
436: ChannelListener l = (ChannelListener) i.next();
437: try {
438: l.channelReconnected(new_addr);
439: } catch (Throwable t) {
440: log.warn("channel listener failed", t);
441: }
442: }
443: }
444: }
445: /* ----------------------------------------------------------------------- */
446:
447: }
|