001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.description;
020:
021: import org.apache.axiom.om.util.UUIDGenerator;
022: import org.apache.axiom.soap.SOAPBody;
023: import org.apache.axiom.soap.SOAPEnvelope;
024: import org.apache.axis2.AxisFault;
025: import org.apache.axis2.Constants;
026: import org.apache.axis2.addressing.EndpointReference;
027: import org.apache.axis2.client.OperationClient;
028: import org.apache.axis2.client.Options;
029: import org.apache.axis2.client.async.AsyncResult;
030: import org.apache.axis2.client.async.Callback;
031: import org.apache.axis2.client.async.AxisCallback;
032: import org.apache.axis2.context.ConfigurationContext;
033: import org.apache.axis2.context.MessageContext;
034: import org.apache.axis2.context.OperationContext;
035: import org.apache.axis2.context.ServiceContext;
036: import org.apache.axis2.engine.AxisEngine;
037: import org.apache.axis2.i18n.Messages;
038: import org.apache.axis2.transport.TransportUtils;
039: import org.apache.axis2.transport.http.HTTPConstants;
040: import org.apache.axis2.util.CallbackReceiver;
041: import org.apache.axis2.util.Utils;
042: import org.apache.axis2.wsdl.WSDLConstants;
043: import org.apache.commons.logging.Log;
044: import org.apache.commons.logging.LogFactory;
045:
046: import javax.xml.namespace.QName;
047: import java.util.HashMap;
048:
049: public class OutInAxisOperation extends TwoChannelAxisOperation {
050: public OutInAxisOperation() {
051: super ();
052: //setup a temporary name
053: QName tmpName = new QName(this .getClass().getName() + "_"
054: + UUIDGenerator.getUUID());
055: this .setName(tmpName);
056: setMessageExchangePattern(WSDL2Constants.MEP_URI_OUT_IN);
057: }
058:
059: public OutInAxisOperation(QName name) {
060: super (name);
061: setMessageExchangePattern(WSDL2Constants.MEP_URI_OUT_IN);
062: }
063:
064: public void addMessageContext(MessageContext msgContext,
065: OperationContext opContext) throws AxisFault {
066: HashMap mep = opContext.getMessageContexts();
067: MessageContext immsgContext = (MessageContext) mep
068: .get(MESSAGE_LABEL_IN_VALUE);
069: MessageContext outmsgContext = (MessageContext) mep
070: .get(MESSAGE_LABEL_OUT_VALUE);
071:
072: if ((immsgContext != null) && (outmsgContext != null)) {
073: throw new AxisFault(Messages.getMessage("mepcompleted"));
074: }
075:
076: if (outmsgContext == null) {
077: mep.put(MESSAGE_LABEL_OUT_VALUE, msgContext);
078: } else {
079: mep.put(MESSAGE_LABEL_IN_VALUE, msgContext);
080: opContext.setComplete(true);
081: opContext.cleanup();
082: }
083: }
084:
085: /**
086: * Returns a MEP client for an Out-IN operation. This client can be used to
087: * interact with a server which is offering an In-Out operation. To use the
088: * client, you must call addMessageContext() with a message context and then
089: * call execute() to execute the client.
090: *
091: * @param sc The service context for this client to live within. Cannot be
092: * null.
093: * @param options Options to use as defaults for this client. If any options are
094: * set specifically on the client then those override options
095: * here.
096: */
097: public OperationClient createClient(ServiceContext sc,
098: Options options) {
099: return new OutInAxisOperationClient(this , sc, options);
100: }
101: }
102:
103: /**
104: * MEP client for moi.
105: */
106: class OutInAxisOperationClient extends OperationClient {
107:
108: private static Log log = LogFactory
109: .getLog(OutInAxisOperationClient.class);
110:
111: OutInAxisOperationClient(OutInAxisOperation axisOp,
112: ServiceContext sc, Options options) {
113: super (axisOp, sc, options);
114: }
115:
116: /**
117: * Adds message context to operation context, so that it will handle the
118: * logic correctly if the OperationContext is null then new one will be
119: * created, and Operation Context will become null when some one calls reset().
120: *
121: * @param msgContext the MessageContext to add
122: * @throws AxisFault
123: */
124: public void addMessageContext(MessageContext msgContext)
125: throws AxisFault {
126: msgContext.setServiceContext(sc);
127: if (msgContext.getMessageID() == null) {
128: setMessageID(msgContext);
129: }
130: axisOp.registerOperationContext(msgContext, oc);
131: }
132:
133: /**
134: * Returns the message context for a given message label.
135: *
136: * @param messageLabel :
137: * label of the message and that can be either "Out" or "In" and
138: * nothing else
139: * @return Returns MessageContext.
140: * @throws AxisFault
141: */
142: public MessageContext getMessageContext(String messageLabel)
143: throws AxisFault {
144: return oc.getMessageContext(messageLabel);
145: }
146:
147: public void setCallback(Callback callback) {
148: this .callback = callback;
149: }
150:
151: /**
152: * Executes the MEP. What this does depends on the specific MEP client. The
153: * basic idea is to have the MEP client execute and do something with the
154: * messages that have been added to it so far. For example, if its an Out-In
155: * MEP, then if the Out message has been set, then executing the client asks
156: * it to send the message and get the In message, possibly using a different
157: * thread.
158: *
159: * @param block Indicates whether execution should block or return ASAP. What
160: * block means is of course a function of the specific MEP
161: * client. IGNORED BY THIS MEP CLIENT.
162: * @throws AxisFault if something goes wrong during the execution of the MEP.
163: */
164: public void executeImpl(boolean block) throws AxisFault {
165: if (log.isDebugEnabled()) {
166: log.debug("Entry: OutInAxisOperationClient::execute, "
167: + block);
168: }
169: if (completed) {
170: throw new AxisFault(Messages.getMessage("mepiscomplted"));
171: }
172: ConfigurationContext cc = sc.getConfigurationContext();
173:
174: // copy interesting info from options to message context.
175: MessageContext mc = oc
176: .getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
177: if (mc == null) {
178: throw new AxisFault(Messages.getMessage("outmsgctxnull"));
179: }
180: prepareMessageContext(cc, mc);
181:
182: if (options.getTransportIn() == null
183: && mc.getTransportIn() == null) {
184: mc.setTransportIn(ClientUtils.inferInTransport(cc
185: .getAxisConfiguration(), options, mc));
186: } else if (mc.getTransportIn() == null) {
187: mc.setTransportIn(options.getTransportIn());
188: }
189:
190: /**
191: * If a module has set the USE_ASYNC_OPERATIONS option then we override the behaviour
192: * for sync calls, and effectively USE_CUSTOM_LISTENER too. However we leave real
193: * async calls alone.
194: */
195: boolean useAsync = false;
196: if (!options.isUseSeparateListener()) {
197: Boolean useAsyncOption = (Boolean) mc
198: .getProperty(Constants.Configuration.USE_ASYNC_OPERATIONS);
199: if (useAsyncOption != null) {
200: useAsync = useAsyncOption.booleanValue();
201: }
202: }
203: EndpointReference replyTo = mc.getReplyTo();
204: if (replyTo != null && replyTo.hasNoneAddress()) {
205: throw new AxisFault(
206: replyTo.getAddress()
207: + ""
208: + " can not be used with OutInAxisOperationClient , user either "
209: + "fireAndForget or sendRobust)");
210: }
211: if (replyTo != null && !replyTo.hasAnonymousAddress()) {
212: useAsync = true;
213: }
214:
215: if (useAsync || options.isUseSeparateListener()) {
216: sendAsync(useAsync, mc);
217: } else {
218: if (block) {
219: // Send the SOAP Message and receive a response
220: send(mc);
221: completed = true;
222: } else {
223: sc.getConfigurationContext().getThreadPool().execute(
224: new NonBlockingInvocationWorker(callback, mc,
225: axisCallback));
226: }
227: }
228: }
229:
230: private void sendAsync(boolean useAsync, MessageContext mc)
231: throws AxisFault {
232: if (log.isDebugEnabled()) {
233: log.debug("useAsync=" + useAsync + ", seperateListener="
234: + options.isUseSeparateListener());
235: }
236: /**
237: * We are following the async path. If the user hasn't set a callback object then we must
238: * block until the whole MEP is complete, as they have no other way to get their reply message.
239: */
240: CallbackReceiver callbackReceiver;
241: if (axisOp.getMessageReceiver() != null
242: && axisOp.getMessageReceiver() instanceof CallbackReceiver) {
243: callbackReceiver = (CallbackReceiver) axisOp
244: .getMessageReceiver();
245: } else {
246: if (log.isDebugEnabled()) {
247: log.debug("Creating new callback receiver");
248: }
249: callbackReceiver = new CallbackReceiver();
250: axisOp.setMessageReceiver(callbackReceiver);
251: }
252:
253: SyncCallBack internalCallback = null;
254: if (callback != null) {
255: callbackReceiver.addCallback(mc.getMessageID(), callback);
256: } else if (axisCallback != null) {
257: callbackReceiver.addCallback(mc.getMessageID(),
258: axisCallback);
259: } else {
260: if (log.isDebugEnabled()) {
261: log.debug("Creating internal callback");
262: }
263: internalCallback = new SyncCallBack();
264: callbackReceiver.addCallback(mc.getMessageID(),
265: internalCallback);
266: }
267:
268: /**
269: * If USE_CUSTOM_LISTENER is set to 'true' the replyTo value will not be replaced and Axis2 will not
270: * start its internal listner. Some other enntity (e.g. a module) should take care of obtaining the
271: * response message.
272: */
273: Boolean useCustomListener = (Boolean) options
274: .getProperty(Constants.Configuration.USE_CUSTOM_LISTENER);
275: if (useAsync) {
276: useCustomListener = Boolean.TRUE;
277: }
278: if (useCustomListener == null
279: || !useCustomListener.booleanValue()) {
280: if (mc.getReplyTo() == null) {
281: EndpointReference replyToFromTransport = mc
282: .getConfigurationContext().getListenerManager()
283: .getEPRforService(
284: sc.getAxisService().getName(),
285: axisOp.getName().getLocalPart(),
286: mc.getTransportIn().getName());
287:
288: if (mc.getReplyTo() == null) {
289: mc.setReplyTo(replyToFromTransport);
290: } else {
291: mc.getReplyTo().setAddress(
292: replyToFromTransport.getAddress());
293: }
294: }
295: }
296:
297: //if we don't do this , this guy will wait till it gets HTTP 202 in the HTTP case
298: mc.setProperty(MessageContext.TRANSPORT_NON_BLOCKING,
299: Boolean.TRUE);
300: mc.getConfigurationContext().registerOperationContext(
301: mc.getMessageID(), oc);
302: AxisEngine.send(mc);
303: if (internalCallback != null) {
304: internalCallback.waitForCompletion(options
305: .getTimeOutInMilliSeconds());
306:
307: // process the result of the invocation
308: if (internalCallback.envelope == null) {
309: if (internalCallback.error == null) {
310: log
311: .error("Callback had neither error nor response");
312: }
313: if (options.isExceptionToBeThrownOnSOAPFault()) {
314: throw AxisFault.makeFault(internalCallback.error);
315: }
316: }
317: }
318: }
319:
320: /**
321: * When synchronous send() gets back a response MessageContext, this is the workhorse
322: * method which processes it.
323: *
324: * @param responseMessageContext the active response MessageContext
325: * @throws AxisFault if something went wrong
326: */
327: protected void handleResponse(MessageContext responseMessageContext)
328: throws AxisFault {
329: // Options object reused above so soapAction needs to be removed so
330: // that soapAction+wsa:Action on response don't conflict
331: responseMessageContext.setSoapAction(null);
332:
333: if (responseMessageContext.getEnvelope() == null) {
334: // If request is REST we assume the responseMessageContext is REST, so
335: // set the variable
336: /*
337: * old code here was using the outbound message context to set the inbound SOAP namespace,
338: * as such and passing it to TransportUtils.createSOAPMessage
339: *
340: * msgctx.getEnvelope().getNamespace().getNamespaceURI()
341: *
342: * However, the SOAP1.2 spec, appendix A indicates that if a SOAP1.2 message is sent to a SOAP1.1
343: * endpoint, we will get a SOAP1.1 (fault) message response. We need another way to set
344: * the inbound SOAP version. Best way to do this is to trust the content type and let
345: * createSOAPMessage take care of figuring out what the SOAP namespace is.
346: */
347: SOAPEnvelope resenvelope = TransportUtils
348: .createSOAPMessage(responseMessageContext);
349: if (resenvelope != null) {
350: responseMessageContext.setEnvelope(resenvelope);
351: } else {
352: throw new AxisFault(
353: Messages
354: .getMessage("blockingInvocationExpectsResponse"));
355: }
356: }
357: SOAPEnvelope resenvelope = responseMessageContext.getEnvelope();
358: if (resenvelope != null) {
359: AxisEngine.receive(responseMessageContext);
360: if (responseMessageContext.getReplyTo() != null) {
361: sc.setTargetEPR(responseMessageContext.getReplyTo());
362: }
363: if (resenvelope.getBody().hasFault()
364: || responseMessageContext.isProcessingFault()) {
365: if (options.isExceptionToBeThrownOnSOAPFault()) {
366: // does the SOAPFault has a detail element for Excpetion
367: throw Utils
368: .getInboundFaultFromMessageContext(responseMessageContext);
369: }
370: }
371: }
372: }
373:
374: /**
375: * Synchronously send the request and receive a response. This relies on the transport
376: * correctly connecting the response InputStream!
377: *
378: * @param msgContext the request MessageContext to send.
379: * @return Returns MessageContext.
380: * @throws AxisFault Sends the message using a two way transport and waits for a response
381: */
382: protected MessageContext send(MessageContext msgContext)
383: throws AxisFault {
384:
385: // create the responseMessageContext
386:
387: MessageContext responseMessageContext = msgContext
388: .getConfigurationContext().createMessageContext();
389:
390: responseMessageContext.setServerSide(false);
391: responseMessageContext.setOperationContext(msgContext
392: .getOperationContext());
393: responseMessageContext.setOptions(new Options(options));
394: responseMessageContext.setMessageID(msgContext.getMessageID());
395: addMessageContext(responseMessageContext);
396: responseMessageContext.setServiceContext(msgContext
397: .getServiceContext());
398: responseMessageContext.setAxisMessage(axisOp
399: .getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
400:
401: //sending the message
402: AxisEngine.send(msgContext);
403:
404: responseMessageContext.setDoingREST(msgContext.isDoingREST());
405:
406: // Copy RESPONSE properties which the transport set onto the request message context when it processed
407: // the incoming response recieved in reply to an outgoing request.
408: responseMessageContext.setProperty(
409: MessageContext.TRANSPORT_HEADERS, msgContext
410: .getProperty(MessageContext.TRANSPORT_HEADERS));
411: responseMessageContext
412: .setProperty(
413: HTTPConstants.MC_HTTP_STATUS_CODE,
414: msgContext
415: .getProperty(HTTPConstants.MC_HTTP_STATUS_CODE));
416:
417: responseMessageContext.setProperty(MessageContext.TRANSPORT_IN,
418: msgContext.getProperty(MessageContext.TRANSPORT_IN));
419: responseMessageContext.setTransportIn(msgContext
420: .getTransportIn());
421: responseMessageContext.setTransportOut(msgContext
422: .getTransportOut());
423: handleResponse(responseMessageContext);
424: return responseMessageContext;
425: }
426:
427: /**
428: * This class is the workhorse for a non-blocking invocation that uses a two
429: * way transport.
430: */
431: private class NonBlockingInvocationWorker implements Runnable {
432: private Callback callback;
433:
434: private MessageContext msgctx;
435: private AxisCallback axisCallback;
436:
437: public NonBlockingInvocationWorker(Callback callback,
438: MessageContext msgctx, AxisCallback axisCallback) {
439: this .callback = callback;
440: this .msgctx = msgctx;
441: this .axisCallback = axisCallback;
442: }
443:
444: public void run() {
445: try {
446: // send the request and wait for response
447: MessageContext response = send(msgctx);
448: // call the callback
449: if (response != null) {
450: SOAPEnvelope resenvelope = response.getEnvelope();
451: SOAPBody body = resenvelope.getBody();
452: if (body.hasFault()) {
453: // If a fault was found, create an AxisFault with a MessageContext so that
454: // other programming models can deserialize the fault to an alternative form.
455: AxisFault fault = new AxisFault(
456: body.getFault(), response);
457: if (callback != null) {
458: callback.onError(fault);
459: } else {
460: axisCallback.onError(fault);
461: }
462:
463: } else {
464: if (callback != null) {
465: AsyncResult asyncResult = new AsyncResult(
466: response);
467: callback.onComplete(asyncResult);
468: } else {
469: axisCallback.onMessage(response);
470: }
471:
472: }
473: }
474:
475: } catch (Exception e) {
476: if (callback != null) {
477: callback.onError(e);
478: } else {
479: axisCallback.onError(e);
480: }
481:
482: } finally {
483: if (callback != null) {
484: callback.setComplete(true);
485: }
486: }
487: }
488: }
489:
490: /**
491: * This class acts as a callback that allows users to wait on the result.
492: */
493: private class SyncCallBack implements AxisCallback {
494: boolean complete;
495: boolean receivedFault;
496:
497: public boolean waitForCompletion(long timeout) throws AxisFault {
498: synchronized (this ) {
499: try {
500: if (complete)
501: return !receivedFault;
502: wait(timeout);
503: if (!complete) {
504: // We timed out!
505: throw new AxisFault(Messages
506: .getMessage("responseTimeOut"));
507: }
508: } catch (InterruptedException e) {
509: // Something interrupted our wait!
510: error = e;
511: }
512: }
513:
514: if (error != null)
515: throw AxisFault.makeFault(error);
516:
517: return !receivedFault;
518: }
519:
520: /**
521: * This is called when we receive a message.
522: *
523: * @param msgContext the (response) MessageContext
524: */
525: public void onMessage(MessageContext msgContext) {
526: // Transport input stream gets closed after calling setComplete
527: // method. Have to build the whole envelope including the
528: // attachments at this stage. Data might get lost if the input
529: // stream gets closed before building the whole envelope.
530: this .envelope = msgContext.getEnvelope();
531: this .envelope.buildWithAttachments();
532: }
533:
534: /**
535: * This gets called when a fault message is received.
536: *
537: * @param msgContext the MessageContext containing the fault.
538: */
539: public void onFault(MessageContext msgContext) {
540: error = Utils.getInboundFaultFromMessageContext(msgContext);
541: }
542:
543: /**
544: * This is called at the end of the MEP no matter what happens, quite like a
545: * finally block.
546: */
547: public synchronized void onComplete() {
548: complete = true;
549: notify();
550: }
551:
552: private SOAPEnvelope envelope;
553:
554: private Exception error;
555:
556: public void onError(Exception e) {
557: if (log.isDebugEnabled()) {
558: log
559: .debug("Entry: OutInAxisOperationClient$SyncCallBack::onError, "
560: + e);
561: }
562: error = e;
563: if (log.isDebugEnabled()) {
564: log
565: .debug("Exit: OutInAxisOperationClient$SyncCallBack::onError");
566: }
567: }
568: }
569:
570: }
|