001: package org.jacorb.orb;
002:
003: /*
004: * JacORB - a free Java ORB
005: *
006: * Copyright (C) 1997-2004 Gerald Brose.
007: *
008: * This library is free software; you can redistribute it and/or
009: * modify it under the terms of the GNU Library General Public
010: * License as published by the Free Software Foundation; either
011: * version 2 of the License, or (at your option) any later version.
012: *
013: * This library is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
016: * Library General Public License for more details.
017: *
018: * You should have received a copy of the GNU Library General Public
019: * License along with this library; if not, write to the Free
020: * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
021: */
022:
023: import java.util.*;
024:
025: import org.apache.avalon.framework.logger.Logger;
026: import org.apache.avalon.framework.configuration.Configurable;
027:
028: import org.jacorb.orb.giop.MessageInputStream;
029: import org.jacorb.orb.giop.ReplyInputStream;
030: import org.jacorb.orb.giop.ReplyPlaceholder;
031: import org.jacorb.util.Time;
032:
033: import org.omg.CORBA.MARSHAL;
034: import org.omg.CORBA.SystemException;
035: import org.omg.CORBA.portable.ApplicationException;
036: import org.omg.CORBA.portable.InvokeHandler;
037: import org.omg.CORBA.portable.RemarshalException;
038: import org.omg.CORBA.portable.ServantObject;
039: import org.omg.GIOP.ReplyStatusType_1_2;
040: import org.omg.Messaging.ExceptionHolder;
041: import org.omg.TimeBase.UtcT;
042:
043: /**
044: * A special ReplyPlaceholder that receives replies to normal requests,
045: * either synchronously or asynchronously. A ReplyReceiver
046: * handles all ORB-internal work that needs to be done for the reply,
047: * such as checking for exceptions and invoking the interceptors.
048: * The client stub can either do a blocking wait on the ReplyReceiver
049: * (via getReply()), or a ReplyHandler can be supplied when the
050: * ReplyReceiver is created; then the reply is delivered to that
051: * ReplyHandler.
052: *
053: * @author Andre Spiegel <spiegel@gnu.org>
054: * @version $Id: ReplyReceiver.java,v 1.33 2006/08/29 15:02:17 alphonse.bendt Exp $
055: */
056:
057: public class ReplyReceiver extends ReplyPlaceholder implements
058: Configurable {
059: private final org.jacorb.orb.Delegate delegate;
060: private final ClientInterceptorHandler interceptors;
061:
062: private final org.omg.Messaging.ReplyHandler replyHandler;
063:
064: private final String operation;
065: private final Timer timer;
066:
067: private Logger logger;
068:
069: /** configuration properties */
070: private boolean retry_on_failure = false;
071:
072: public ReplyReceiver(org.jacorb.orb.Delegate delegate,
073: String operation, org.omg.TimeBase.UtcT replyEndTime,
074: ClientInterceptorHandler interceptors,
075: org.omg.Messaging.ReplyHandler replyHandler) {
076: super ((org.jacorb.orb.ORB) delegate.orb(null));
077:
078: this .delegate = delegate;
079: this .operation = operation;
080: this .interceptors = interceptors;
081: this .replyHandler = replyHandler;
082:
083: if (replyEndTime != null) {
084: timer = new Timer(replyEndTime);
085: timer.setName("ReplyReceiver Timer");
086: timer.start();
087: } else {
088: timer = null;
089: }
090:
091: }
092:
093: public void configure(
094: org.apache.avalon.framework.configuration.Configuration configuration) {
095: logger = ((org.jacorb.config.Configuration) configuration)
096: .getNamedLogger("jacorb.orb.rep_recv");
097: retry_on_failure = configuration.getAttributeAsBoolean(
098: "jacorb.connection.client.retry_on_failure", false);
099: }
100:
101: public void replyReceived(MessageInputStream in) {
102: if (timeoutException) {
103: return; // discard reply
104: }
105:
106: if (timer != null) {
107: timer.wakeup();
108: }
109:
110: Set pending_replies = delegate.get_pending_replies();
111: // grab pending_replies lock BEFORE my own,
112: // then I will already have it in the replyDone call below.
113: synchronized (pending_replies) {
114: // This internal synchronization prevents a deadlock
115: // when a timeout and a reply coincide, suggested
116: // by Jimmy Wilson, 2005-01. It is only a temporary
117: // work-around though, until I can simplify this entire
118: // logic much more thoroughly, AS.
119: synchronized (lock) {
120: if (timeoutException) {
121: return; // discard reply
122: }
123:
124: this .in = in;
125: delegate.replyDone(this );
126:
127: if (replyHandler != null) {
128: // asynchronous delivery
129: performCallback((ReplyInputStream) in);
130: } else {
131: // synchronous delivery
132: ready = true;
133: lock.notifyAll();
134: }
135: }
136: }
137: }
138:
139: private void performCallback(ReplyInputStream reply) {
140: // TODO: Call interceptors.
141:
142: org.omg.CORBA.portable.Delegate replyHandlerDelegate = ((org.omg.CORBA.portable.ObjectImpl) replyHandler)
143: ._get_delegate();
144:
145: ServantObject so = replyHandlerDelegate.servant_preinvoke(
146: replyHandler, operation, InvokeHandler.class);
147: try {
148: switch (reply.getStatus().value()) {
149: case ReplyStatusType_1_2._NO_EXCEPTION: {
150: ((InvokeHandler) so.servant)._invoke(operation, reply,
151: new DummyResponseHandler());
152: break;
153: }
154: case ReplyStatusType_1_2._USER_EXCEPTION:
155: case ReplyStatusType_1_2._SYSTEM_EXCEPTION: {
156: ExceptionHolderImpl holder = new ExceptionHolderImpl(
157: reply);
158:
159: org.omg.CORBA_2_3.ORB orb = (org.omg.CORBA_2_3.ORB) replyHandlerDelegate
160: .orb(null);
161: orb.register_value_factory(
162: "IDL:omg.org/Messaging/ExceptionHolder:1.0",
163: new ExceptionHolderFactory());
164:
165: CDRInputStream input = new CDRInputStream(orb, holder
166: .marshal());
167:
168: ((InvokeHandler) so.servant)._invoke(operation
169: + "_excep", input, new DummyResponseHandler());
170: break;
171: }
172: }
173: } catch (Exception e) {
174: logger.warn("Exception during callback", e);
175: } finally {
176: replyHandlerDelegate.servant_postinvoke(replyHandler, so);
177: }
178: }
179:
180: /**
181: * There's a lot of code duplication in this method right now.
182: * This should be merged with performCallback() above.
183: */
184: private void performExceptionCallback(ExceptionHolderImpl holder) {
185: // TODO: Call interceptors.
186:
187: org.omg.CORBA.portable.Delegate replyHandlerDelegate = ((org.omg.CORBA.portable.ObjectImpl) replyHandler)
188: ._get_delegate();
189:
190: ServantObject so = replyHandlerDelegate.servant_preinvoke(
191: replyHandler, operation, InvokeHandler.class);
192: try {
193: org.omg.CORBA_2_3.ORB orb = (org.omg.CORBA_2_3.ORB) replyHandlerDelegate
194: .orb(null);
195: orb.register_value_factory(
196: "IDL:omg.org/Messaging/ExceptionHolder:1.0",
197: new ExceptionHolderFactory());
198:
199: CDRInputStream input = new CDRInputStream(orb, holder
200: .marshal());
201:
202: ((InvokeHandler) so.servant)._invoke(operation + "_excep",
203: input, new DummyResponseHandler());
204: } catch (Exception e) {
205: if (logger.isWarnEnabled()) {
206: logger.warn("Exception during callback: "
207: + e.toString());
208: }
209: } finally {
210: replyHandlerDelegate.servant_postinvoke(replyHandler, so);
211: }
212: }
213:
214: /**
215: * This method blocks until a reply becomes available.
216: * If the reply contains any exceptions, they are rethrown.
217: */
218: public synchronized ReplyInputStream getReply()
219: throws RemarshalException, ApplicationException {
220: try {
221: // On NT connection closure due to service shutdown is not
222: // detected until this point, resulting in a COMM_FAILURE.
223: // Map to RemarshalException to force rebind attempt.
224: try {
225: getInputStream(timer != null); // block until reply is available
226: } catch (org.omg.CORBA.COMM_FAILURE ex) {
227: if (retry_on_failure) {
228: throw new RemarshalException();
229: }
230: //rethrow
231: throw ex;
232: }
233: } catch (SystemException se) {
234: interceptors.handle_receive_exception(se);
235: throw se;
236: } catch (RemarshalException re) {
237: // Wait until the thread that received the actual
238: // forward request rebound the Delegate
239: delegate.waitOnBarrier();
240: throw new RemarshalException();
241: }
242:
243: ReplyInputStream reply = (ReplyInputStream) in;
244:
245: ReplyStatusType_1_2 status = delegate.doNotCheckExceptions() ? ReplyStatusType_1_2.NO_EXCEPTION
246: : reply.getStatus();
247:
248: switch (status.value()) {
249: case ReplyStatusType_1_2._NO_EXCEPTION: {
250: interceptors.handle_receive_reply(reply);
251: return reply;
252: }
253: case ReplyStatusType_1_2._USER_EXCEPTION: {
254: ApplicationException ae = getApplicationException(reply);
255: interceptors.handle_receive_exception(ae, reply);
256: throw ae;
257: }
258: case ReplyStatusType_1_2._SYSTEM_EXCEPTION: {
259: SystemException se = SystemExceptionHelper.read(reply);
260: interceptors.handle_receive_exception(se, reply);
261: throw se;
262: }
263: case ReplyStatusType_1_2._LOCATION_FORWARD:
264: case ReplyStatusType_1_2._LOCATION_FORWARD_PERM: {
265: org.omg.CORBA.Object forward_reference = reply
266: .read_Object();
267: interceptors.handle_location_forward(reply,
268: forward_reference);
269: doRebind(forward_reference);
270: throw new RemarshalException();
271: }
272: case ReplyStatusType_1_2._NEEDS_ADDRESSING_MODE: {
273: throw new org.omg.CORBA.NO_IMPLEMENT(
274: "WARNING: Got reply status NEEDS_ADDRESSING_MODE "
275: + "(not implemented).");
276: }
277: default: {
278: throw new MARSHAL("Received unexpected reply status: "
279: + status.value());
280: }
281: }
282: }
283:
284: private void doRebind(org.omg.CORBA.Object forward_reference) {
285: // make other threads that have unreturned replies wait
286: delegate.lockBarrier();
287:
288: try {
289: // tell every pending request to remarshal
290: // they will be blocked on the barrier
291: Set pending_replies = delegate.get_pending_replies();
292: synchronized (pending_replies) {
293: for (Iterator i = pending_replies.iterator(); i
294: .hasNext();) {
295: ReplyPlaceholder p = (ReplyPlaceholder) i.next();
296: p.retry();
297: }
298: }
299:
300: // do the actual rebind
301: delegate.rebind(forward_reference);
302: } finally {
303: // now other threads can safely remarshal
304: delegate.openBarrier();
305: }
306: }
307:
308: private ApplicationException getApplicationException(
309: ReplyInputStream reply) {
310: reply.mark(0);
311: String id = reply.read_string();
312:
313: try {
314: reply.reset();
315: } catch (java.io.IOException ioe) {
316: logger.error("unexpected Exception in reset()", ioe);
317: }
318:
319: return new ApplicationException(id, reply);
320: }
321:
322: /**
323: * A ResponseHandler that is passed to the ReplyHandler's POA
324: * when we invoke it. Since ReplyHandler operations never generate
325: * replies, this ResponseHandler does nothing to this effect.
326: * The createReply() method, however, is the last method that
327: * is called before control goes to the ReplyHandler servant,
328: * so we use it to check for timing constraints.
329: */
330: private class DummyResponseHandler implements
331: org.omg.CORBA.portable.ResponseHandler {
332: public org.omg.CORBA.portable.OutputStream createReply() {
333: // the latest possible time at which we can do this
334: Time.waitFor(delegate.getReplyStartTime());
335: return null;
336: }
337:
338: public org.omg.CORBA.portable.OutputStream createExceptionReply() {
339: return null;
340: }
341: }
342:
343: private static class ExceptionHolderFactory implements
344: org.omg.CORBA.portable.ValueFactory {
345: public java.io.Serializable read_value(
346: org.omg.CORBA_2_3.portable.InputStream is) {
347: ExceptionHolder result = new ExceptionHolderImpl();
348: result._read(is);
349: return result;
350: }
351: }
352:
353: /**
354: * This class implements timeouts while we are waiting for
355: * replies. When it is instantiated, it takes a CORBA UtcT
356: * constructor parameter that specifies the timeout expiration
357: * time. The timer starts running as soon as the Thread is
358: * started. When the timeout goes off, this Timer makes sure
359: * that the enclosing ReplyReceiver is deactivated, and that
360: * everybody associated with it is notified appropriately.
361: * The timeout can be cancelled by calling wakeup() on a Timer.
362: */
363: private class Timer extends Thread {
364: private final UtcT endTime;
365: private boolean awakened = false;
366:
367: public Timer(UtcT endTime) {
368: super ("ReplyReceiverTimer");
369: this .endTime = endTime;
370: }
371:
372: public void run() {
373: synchronized (lock) {
374: timeoutException = false;
375: if (!awakened) {
376: long time = org.jacorb.util.Time.millisTo(endTime);
377: if (time > 0) {
378: try {
379: lock.wait(time);
380: } catch (InterruptedException ex) {
381: logger
382: .info("Interrupted while waiting for timeout");
383: }
384: }
385: if (!awakened) {
386: timeoutException = true;
387:
388: if (replyHandler != null) {
389: ExceptionHolderImpl exHolder = new ExceptionHolderImpl(
390: new org.omg.CORBA.TIMEOUT());
391: performExceptionCallback(exHolder);
392: }
393: ready = true;
394: lock.notifyAll();
395: }
396: }
397: }
398: }
399:
400: public void wakeup() {
401: synchronized (lock) {
402: awakened = true;
403: timeoutException = false;
404: lock.notifyAll();
405: }
406: }
407: }
408: }
|