001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019:
020: package org.apache.axis2.engine;
021:
022: import org.apache.axiom.soap.SOAPEnvelope;
023: import org.apache.axiom.soap.SOAPHeaderBlock;
024: import org.apache.axis2.AxisFault;
025: import org.apache.axis2.client.async.Callback;
026: import org.apache.axis2.client.async.AxisCallback;
027: import org.apache.axis2.context.ConfigurationContext;
028: import org.apache.axis2.context.MessageContext;
029: import org.apache.axis2.context.OperationContext;
030: import org.apache.axis2.description.AxisOperation;
031: import org.apache.axis2.description.TransportOutDescription;
032: import org.apache.axis2.engine.Handler.InvocationResponse;
033: import org.apache.axis2.i18n.Messages;
034: import org.apache.axis2.transport.TransportSender;
035: import org.apache.axis2.util.CallbackReceiver;
036: import org.apache.axis2.util.LoggingControl;
037: import org.apache.axis2.util.MessageContextBuilder;
038: import org.apache.commons.logging.Log;
039: import org.apache.commons.logging.LogFactory;
040:
041: import javax.xml.namespace.QName;
042: import java.util.ArrayList;
043: import java.util.Iterator;
044:
045: /**
046: * There is one engine for the Server and the Client. the send() and receive()
047: * Methods are the basic operations the Sync, Async messageing are build on top.
048: */
049: public class AxisEngine {
050:
051: /**
052: * Field log
053: */
054: private static final Log log = LogFactory.getLog(AxisEngine.class);
055:
056: private static boolean RESUMING_EXECUTION = true;
057: private static boolean NOT_RESUMING_EXECUTION = false;
058:
059: /**
060: * Constructor AxisEngine
061: */
062: public AxisEngine(ConfigurationContext engineContext) {
063: }
064:
065: private static void checkMustUnderstand(MessageContext msgContext)
066: throws AxisFault {
067: SOAPEnvelope envelope = msgContext.getEnvelope();
068: if (envelope.getHeader() == null) {
069: return;
070: }
071:
072: // Get all the headers targeted to us
073: Iterator headerBlocks = envelope.getHeader()
074: .getHeadersToProcess(null);
075:
076: while (headerBlocks.hasNext()) {
077: SOAPHeaderBlock headerBlock = (SOAPHeaderBlock) headerBlocks
078: .next();
079:
080: // if this header block has been processed or mustUnderstand isn't
081: // turned on then its cool
082: if (headerBlock.isProcessed()
083: || !headerBlock.getMustUnderstand()) {
084: continue;
085: }
086:
087: // Oops, throw an appropriate MustUnderstand fault!!
088: QName faultQName = headerBlock.getVersion()
089: .getMustUnderstandFaultCode();
090: throw new AxisFault(Messages.getMessage(
091: "mustunderstandfailed", headerBlock.getNamespace()
092: .getNamespaceURI(), headerBlock
093: .getLocalName()), faultQName);
094: }
095: }
096:
097: /**
098: * This method is called to handle any error that occurs at inflow or outflow. But if the
099: * method is called twice, it implies that sending the error handling has failed, in which case
100: * the method logs the error and exists.
101: *
102: * @deprecated (post 1.1 branch)
103: */
104: public static MessageContext createFaultMessageContext(
105: MessageContext processingContext, Throwable e)
106: throws AxisFault {
107: return MessageContextBuilder.createFaultMessageContext(
108: processingContext, e);
109: }
110:
111: /**
112: * This methods represents the inflow of the Axis, this could be either at the server side or the client side.
113: * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
114: * deployment time by the deployment module
115: *
116: * @throws AxisFault
117: * @see MessageContext
118: * @see Phase
119: * @see Handler
120: */
121: public static InvocationResponse receive(MessageContext msgContext)
122: throws AxisFault {
123: if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
124: log.trace(msgContext.getLogIDString() + " receive:"
125: + msgContext.getMessageID());
126: }
127: ConfigurationContext confContext = msgContext
128: .getConfigurationContext();
129: ArrayList preCalculatedPhases;
130: if (msgContext.isFault() || msgContext.isProcessingFault()) {
131: preCalculatedPhases = confContext.getAxisConfiguration()
132: .getInFaultFlowPhases();
133: msgContext.setFLOW(MessageContext.IN_FAULT_FLOW);
134: } else {
135: preCalculatedPhases = confContext.getAxisConfiguration()
136: .getInFlowPhases();
137: msgContext.setFLOW(MessageContext.IN_FLOW);
138: }
139: // Set the initial execution chain in the MessageContext to a *copy* of what
140: // we got above. This allows individual message processing to change the chain without
141: // affecting later messages.
142: msgContext.setExecutionChain((ArrayList) preCalculatedPhases
143: .clone());
144: try {
145: InvocationResponse pi = invoke(msgContext,
146: NOT_RESUMING_EXECUTION);
147:
148: if (pi.equals(InvocationResponse.CONTINUE)) {
149: checkMustUnderstand(msgContext);
150: if (msgContext.isServerSide()) {
151: // invoke the Message Receivers
152:
153: MessageReceiver receiver = msgContext
154: .getAxisOperation().getMessageReceiver();
155: if (receiver == null) {
156: throw new AxisFault(Messages.getMessage(
157: "nomessagereciever", msgContext
158: .getAxisOperation().getName()
159: .toString()));
160: }
161: receiver.receive(msgContext);
162: }
163: flowComplete(msgContext);
164: } else if (pi.equals(InvocationResponse.SUSPEND)) {
165: return pi;
166: } else if (pi.equals(InvocationResponse.ABORT)) {
167: flowComplete(msgContext);
168: return pi;
169: } else {
170: String errorMsg = "Unrecognized InvocationResponse encountered in AxisEngine.receive()";
171: log.error(msgContext.getLogIDString() + " " + errorMsg);
172: throw new AxisFault(errorMsg);
173: }
174: } catch (AxisFault e) {
175: msgContext.setFailureReason(e);
176: flowComplete(msgContext);
177: throw e;
178: }
179:
180: return InvocationResponse.CONTINUE;
181: }
182:
183: private static void processFault(MessageContext msgContext,
184: AxisFault e) {
185: try {
186: MessageContext faultMC = MessageContextBuilder
187: .createFaultMessageContext(msgContext, e);
188:
189: // Figure out where this goes
190: sendFault(faultMC);
191: } catch (AxisFault axisFault) {
192: log.error(axisFault.getMessage(), axisFault);
193: }
194: }
195:
196: /**
197: * Take the execution chain from the msgContext , and then take the current Index
198: * and invoke all the phases in the arraylist
199: * if the msgContext is pauesd then the execution will be breaked
200: *
201: * @param msgContext
202: * @return An InvocationResponse that indicates what
203: * the next step in the message processing should be.
204: * @throws AxisFault
205: */
206: private static InvocationResponse invoke(MessageContext msgContext,
207: boolean resuming) throws AxisFault {
208:
209: if (msgContext.getCurrentHandlerIndex() == -1) {
210: msgContext.setCurrentHandlerIndex(0);
211: }
212:
213: InvocationResponse pi = InvocationResponse.CONTINUE;
214:
215: while (msgContext.getCurrentHandlerIndex() < msgContext
216: .getExecutionChain().size()) {
217: Handler currentHandler = (Handler) msgContext
218: .getExecutionChain().get(
219: msgContext.getCurrentHandlerIndex());
220:
221: try {
222: if (!resuming) {
223: msgContext.addExecutedPhase(currentHandler);
224: } else {
225: /* If we are resuming the flow, we don't want to add the phase
226: * again, as it has already been added.
227: */
228: resuming = false;
229: }
230: pi = currentHandler.invoke(msgContext);
231: } catch (AxisFault e) {
232: if (msgContext.getCurrentPhaseIndex() == 0) {
233: /* If we got a fault, we still want to add the phase to the
234: list to be executed for flowComplete(...) unless this was
235: the first handler, as then the currentPhaseIndex will be
236: set to 0 and this will look like we've executed all of the
237: handlers. If, at some point, a phase really needs to get
238: notification of flowComplete, then we'll need to introduce
239: some more complex logic to keep track of what has been
240: executed.*/
241: msgContext.removeFirstExecutedPhase();
242: }
243: throw e;
244: }
245:
246: if (pi.equals(InvocationResponse.SUSPEND)
247: || pi.equals(InvocationResponse.ABORT)) {
248: break;
249: }
250:
251: msgContext.setCurrentHandlerIndex(msgContext
252: .getCurrentHandlerIndex() + 1);
253: }
254:
255: return pi;
256: }
257:
258: private static void flowComplete(MessageContext msgContext) {
259: Iterator invokedPhaseIterator = msgContext.getExecutedPhases();
260:
261: while (invokedPhaseIterator.hasNext()) {
262: Handler currentHandler = ((Handler) invokedPhaseIterator
263: .next());
264: currentHandler.flowComplete(msgContext);
265: }
266:
267: /*This is needed because the OutInAxisOperation currently invokes
268: * receive() even when a fault occurs, and we will have already executed
269: * the flowComplete on those before receiveFault() is called.
270: */
271: msgContext.resetExecutedPhases();
272: }
273:
274: /**
275: * If the msgConetext is puased and try to invoke then
276: * first invoke the phase list and after the message receiver
277: *
278: * @param msgContext
279: * @return An InvocationResponse allowing the invoker to perhaps determine
280: * whether or not the message processing will ever succeed.
281: * @throws AxisFault
282: */
283: public static InvocationResponse resumeReceive(
284: MessageContext msgContext) throws AxisFault {
285: if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
286: log.trace(msgContext.getLogIDString() + " resumeReceive:"
287: + msgContext.getMessageID());
288: }
289:
290: //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeReceiveFault as well, when, in fact, this does both
291: //REVIEW: Unlike with receive, there is no wrapping try/catch clause which would
292: //fire off the flowComplete on an error, as we have to assume that the
293: //message will be resumed again, but perhaps we need to unwind back to
294: //the point at which the message was resumed and provide another API
295: //to allow the full unwind if the message is going to be discarded.
296: //invoke the phases
297: InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
298: //invoking the MR
299:
300: if (pi.equals(InvocationResponse.CONTINUE)) {
301: checkMustUnderstand(msgContext);
302: if (msgContext.isServerSide()) {
303: // invoke the Message Receivers
304: MessageReceiver receiver = msgContext
305: .getAxisOperation().getMessageReceiver();
306: if (receiver == null) {
307: throw new AxisFault(Messages.getMessage(
308: "nomessagereciever", msgContext
309: .getAxisOperation().getName()
310: .toString()));
311: }
312: receiver.receive(msgContext);
313: }
314: flowComplete(msgContext);
315: }
316:
317: return pi;
318: }
319:
320: /**
321: * To resume the invocation at the send path , this is neened since it is require to call
322: * TransportSender at the end
323: *
324: * @param msgContext
325: * @return An InvocationResponse allowing the invoker to perhaps determine
326: * whether or not the message processing will ever succeed.
327: * @throws AxisFault
328: */
329: public static InvocationResponse resumeSend(
330: MessageContext msgContext) throws AxisFault {
331: if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
332: log.trace(msgContext.getLogIDString() + " resumeSend:"
333: + msgContext.getMessageID());
334: }
335:
336: //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeSendFault as well, when, in fact, this does both
337: //REVIEW: Unlike with send, there is no wrapping try/catch clause which would
338: //fire off the flowComplete on an error, as we have to assume that the
339: //message will be resumed again, but perhaps we need to unwind back to
340: //the point at which the message was resumed and provide another API
341: //to allow the full unwind if the message is going to be discarded.
342: //invoke the phases
343: InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
344: //Invoking Transport Sender
345: if (pi.equals(InvocationResponse.CONTINUE)) {
346: // write the Message to the Wire
347: TransportOutDescription transportOut = msgContext
348: .getTransportOut();
349: TransportSender sender = transportOut.getSender();
350: sender.invoke(msgContext);
351: flowComplete(msgContext);
352: }
353:
354: return pi;
355: }
356:
357: /**
358: * Resume processing of a message.
359: *
360: * @param msgctx
361: * @return An InvocationResponse allowing the invoker to perhaps determine
362: * whether or not the message processing will ever succeed.
363: * @throws AxisFault
364: */
365: public static InvocationResponse resume(MessageContext msgctx)
366: throws AxisFault {
367: if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
368: log.trace(msgctx.getLogIDString() + " resume:"
369: + msgctx.getMessageID());
370: }
371:
372: msgctx.setPaused(false);
373: if (msgctx.getFLOW() == MessageContext.IN_FLOW) {
374: return resumeReceive(msgctx);
375: } else {
376: return resumeSend(msgctx);
377: }
378: }
379:
380: /**
381: * This methods represents the outflow of the Axis, this could be either at the server side or the client side.
382: * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
383: * deployment time by the deployment module
384: *
385: * @param msgContext
386: * @throws AxisFault
387: * @see MessageContext
388: * @see Phase
389: * @see Handler
390: */
391: public static void send(MessageContext msgContext) throws AxisFault {
392: if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
393: log.trace(msgContext.getLogIDString() + " send:"
394: + msgContext.getMessageID());
395: }
396: // find and invoke the Phases
397: OperationContext operationContext = msgContext
398: .getOperationContext();
399: ArrayList executionChain = operationContext.getAxisOperation()
400: .getPhasesOutFlow();
401: //rather than having two steps added both oparation and global chain together
402: ArrayList outPhases = new ArrayList();
403: outPhases.addAll(executionChain);
404: outPhases.addAll(msgContext.getConfigurationContext()
405: .getAxisConfiguration().getOutFlowPhases());
406: msgContext.setExecutionChain(outPhases);
407: msgContext.setFLOW(MessageContext.OUT_FLOW);
408: try {
409: InvocationResponse pi = invoke(msgContext,
410: NOT_RESUMING_EXECUTION);
411:
412: if (pi.equals(InvocationResponse.CONTINUE)) {
413: // write the Message to the Wire
414: TransportOutDescription transportOut = msgContext
415: .getTransportOut();
416: if (transportOut == null) {
417: throw new AxisFault(
418: "Transport out has not been set");
419: }
420: TransportSender sender = transportOut.getSender();
421: // This boolean property only used in client side fireAndForget invocation
422: //It will set a property into message context and if some one has set the
423: //property then transport sender will invoke in a diffrent thread
424: Object isTransportNonBlocking = msgContext
425: .getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
426: if (isTransportNonBlocking != null
427: && ((Boolean) isTransportNonBlocking)
428: .booleanValue()) {
429: msgContext
430: .getConfigurationContext()
431: .getThreadPool()
432: .execute(
433: new TransportNonBlockingInvocationWorker(
434: msgContext, sender));
435: } else {
436: sender.invoke(msgContext);
437: }
438: //REVIEW: In the case of the TransportNonBlockingInvocationWorker, does this need to wait until that finishes?
439: flowComplete(msgContext);
440: } else if (pi.equals(InvocationResponse.SUSPEND)) {
441: } else if (pi.equals(InvocationResponse.ABORT)) {
442: flowComplete(msgContext);
443: } else {
444: String errorMsg = "Unrecognized InvocationResponse encountered in AxisEngine.send()";
445: log.error(msgContext.getLogIDString() + " " + errorMsg);
446: throw new AxisFault(errorMsg);
447: }
448: } catch (AxisFault e) {
449: msgContext.setFailureReason(e);
450: flowComplete(msgContext);
451: throw e;
452: }
453: }
454:
455: /**
456: * Sends the SOAP Fault to another SOAP node.
457: *
458: * @param msgContext
459: * @throws AxisFault
460: */
461: public static void sendFault(MessageContext msgContext)
462: throws AxisFault {
463: if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
464: log.trace(msgContext.getLogIDString() + " sendFault:"
465: + msgContext.getMessageID());
466: }
467: OperationContext opContext = msgContext.getOperationContext();
468:
469: //FIXME: If this gets paused in the operation-specific phases, the resume is not going to function correctly as the phases will not have all been set
470:
471: // find and execute the Fault Out Flow Handlers
472: if (opContext != null) {
473: AxisOperation axisOperation = opContext.getAxisOperation();
474: ArrayList faultExecutionChain = axisOperation
475: .getPhasesOutFaultFlow();
476:
477: //adding both operation specific and global out fault flows.
478:
479: ArrayList outFaultPhases = new ArrayList();
480: outFaultPhases.addAll((ArrayList) faultExecutionChain
481: .clone());
482: msgContext.setExecutionChain((ArrayList) outFaultPhases
483: .clone());
484: msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
485: try {
486: InvocationResponse pi = invoke(msgContext,
487: NOT_RESUMING_EXECUTION);
488:
489: if (pi.equals(InvocationResponse.SUSPEND)) {
490: log
491: .warn(msgContext.getLogIDString()
492: + " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
493: return;
494: } else if (pi.equals(InvocationResponse.ABORT)) {
495: flowComplete(msgContext);
496: return;
497: } else if (!pi.equals(InvocationResponse.CONTINUE)) {
498: String errorMsg = "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
499: log.error(msgContext.getLogIDString() + " "
500: + errorMsg);
501: throw new AxisFault(errorMsg);
502: }
503: } catch (AxisFault e) {
504: msgContext.setFailureReason(e);
505: flowComplete(msgContext);
506: throw e;
507: }
508: }
509:
510: msgContext.setExecutionChain((ArrayList) msgContext
511: .getConfigurationContext().getAxisConfiguration()
512: .getOutFaultFlowPhases().clone());
513: msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
514: InvocationResponse pi = invoke(msgContext,
515: NOT_RESUMING_EXECUTION);
516:
517: if (pi.equals(InvocationResponse.CONTINUE)) {
518: // Actually send the SOAP Fault
519: TransportOutDescription transportOut = msgContext
520: .getTransportOut();
521: if (transportOut == null) {
522: throw new AxisFault("Transport out has not been set");
523: }
524: TransportSender sender = transportOut.getSender();
525:
526: sender.invoke(msgContext);
527: flowComplete(msgContext);
528: } else if (pi.equals(InvocationResponse.SUSPEND)) {
529: } else if (pi.equals(InvocationResponse.ABORT)) {
530: flowComplete(msgContext);
531: } else {
532: String errorMsg = "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
533: log.error(msgContext.getLogIDString() + " " + errorMsg);
534: throw new AxisFault(errorMsg);
535: }
536: }
537:
538: /**
539: * This class is used when someone invoke a service invocation with two transports
540: * If we dont create a new thread then the main thread will block untill it gets the
541: * response . In the case of HTTP transportsender will block untill it gets HTTP 200
542: * So , main thread also block till transport sender rereases the tread. So there is no
543: * actual non-blocking. That is why when sending we creat a new thead and send the
544: * requset via that.
545: * <p/>
546: * So whole porpose of this class to send the requset via a new thread
547: * <p/>
548: * way transport.
549: */
550: private static class TransportNonBlockingInvocationWorker implements
551: Runnable {
552: private MessageContext msgctx;
553: private TransportSender sender;
554:
555: public TransportNonBlockingInvocationWorker(
556: MessageContext msgctx, TransportSender sender) {
557: this .msgctx = msgctx;
558: this .sender = sender;
559: }
560:
561: public void run() {
562: try {
563: sender.invoke(msgctx);
564: } catch (Exception e) {
565: log
566: .info(msgctx.getLogIDString() + " "
567: + e.getMessage());
568: if (msgctx
569: .getProperty(MessageContext.DISABLE_ASYNC_CALLBACK_ON_TRANSPORT_ERROR) == null) {
570: AxisOperation axisOperation = msgctx
571: .getAxisOperation();
572: if (axisOperation != null) {
573: MessageReceiver msgReceiver = axisOperation
574: .getMessageReceiver();
575: if ((msgReceiver != null)
576: && (msgReceiver instanceof CallbackReceiver)) {
577: Object callback = ((CallbackReceiver) msgReceiver)
578: .lookupCallback(msgctx
579: .getMessageID());
580: if (callback == null)
581: return; // TODO: should we log this??
582:
583: if (callback instanceof Callback) {
584: ((Callback) callback).onError(e);
585: } else {
586: ((AxisCallback) callback).onError(e);
587: }
588: }
589: }
590: }
591: }
592: }
593: }
594: }
|