001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.jaxws.client.async;
020:
021: import org.apache.axis2.AxisFault;
022: import org.apache.axis2.client.async.AsyncResult;
023: import org.apache.axis2.client.async.Callback;
024: import org.apache.axis2.jaxws.core.InvocationContext;
025: import org.apache.axis2.jaxws.core.MessageContext;
026: import org.apache.commons.logging.Log;
027: import org.apache.commons.logging.LogFactory;
028:
029: import javax.xml.ws.AsyncHandler;
030: import javax.xml.ws.WebServiceException;
031:
032: import java.util.concurrent.Callable;
033: import java.util.concurrent.Executor;
034: import java.util.concurrent.Future;
035: import java.util.concurrent.FutureTask;
036:
037: /**
038: * The CallbackFuture implements the Axis2 <link>org.apache.axis2.client.async.Callback</link> API
039: * and will get registered with the Axis2 engine to receive the asynchronous callback responses.
040: * This object is also responsible for taking the <link>java.util.concurrent.Executor</link> given
041: * to it by the JAX-WS client and using that as the thread on which to deliver the async response
042: * the JAX-WS <link>javax.xml.ws.AsynchHandler</link>.
043: */
044: public class CallbackFuture extends Callback {
045:
046: private static final Log log = LogFactory
047: .getLog(CallbackFuture.class);
048: private static final boolean debug = log.isDebugEnabled();
049:
050: private CallbackFutureTask cft;
051: private Executor executor;
052: private FutureTask task;
053:
054: private InvocationContext invocationCtx;
055:
056: /*
057: * There are two Async Callback Future.cancel scenario that we address
058: * 1) Client app creates request and call Async Operation. Now before the request is submitted
059: * by JAXWS to Executor for processing and any response is received client decides to cancel
060: * the future task.
061: * 2) Client app creates request and call Async Operation. Request is submitted by JAXWS
062: * to Executor for processing and a response is received and client decides to cancel the future
063: * task.
064: *
065: * We will address both these scenarios in the code. In scenario 1 we will do the following:
066: * 1) Check the for the future.isCancelled before submitting the task to Executor
067: * 2) If cancelled then do not submit the task and do not call the Async Handler of client.
068: * 3)The client program in this case (Since it cancelled the future) will be responsible for cleaning any resources that it engages.
069: *
070: * In Second Scenario we will call the AsyncHandler as Future.isCancelled will be false. As per java doc
071: * the Future cannot be cancelled once the task has been submitted. Also the response has already arrived so
072: * we will make the AsyncHandler and let the client code decided how it wants to treat the response.
073: */
074:
075: @SuppressWarnings("unchecked")
076: public CallbackFuture(InvocationContext ic, AsyncHandler handler) {
077: cft = new CallbackFutureTask(ic.getAsyncResponseListener(),
078: handler);
079: task = new FutureTask(cft);
080: executor = ic.getExecutor();
081:
082: /*
083: * TODO review. We need to save the invocation context so we can set it on the
084: * response (or fault) context so the FutureCallback has access to the handler list.
085: */
086: invocationCtx = ic;
087: }
088:
089: public Future<?> getFutureTask() {
090: return (Future<?>) task;
091: }
092:
093: @Override
094: public void onComplete(AsyncResult result) {
095: if (debug) {
096: log.debug("JAX-WS received the async response");
097: }
098:
099: MessageContext response = null;
100: try {
101: response = AsyncUtils.createJAXWSMessageContext(result);
102: response.setInvocationContext(invocationCtx);
103: // make sure request and response contexts share a single parent
104: response.setMEPContext(invocationCtx
105: .getRequestMessageContext().getMEPContext());
106: } catch (WebServiceException e) {
107: cft.setError(e);
108: if (debug) {
109: log
110: .debug("An error occured while processing the async response. "
111: + e.getMessage());
112: }
113: }
114:
115: if (response == null) {
116: // TODO: throw an exception
117: }
118:
119: cft.setMessageContext(response);
120: execute();
121: }
122:
123: @Override
124: public void onError(Exception e) {
125: // If a SOAPFault was returned by the AxisEngine, the AxisFault
126: // that is returned should have a MessageContext with it. Use
127: // this to unmarshall the fault included there.
128: if (e.getClass().isAssignableFrom(AxisFault.class)) {
129: AxisFault fault = (AxisFault) e;
130: MessageContext faultMessageContext = null;
131: try {
132: faultMessageContext = AsyncUtils
133: .createJAXWSMessageContext(fault
134: .getFaultMessageContext());
135: faultMessageContext.setInvocationContext(invocationCtx);
136: // make sure request and response contexts share a single parent
137: faultMessageContext.setMEPContext(invocationCtx
138: .getRequestMessageContext().getMEPContext());
139: } catch (WebServiceException wse) {
140: cft.setError(wse);
141: }
142:
143: cft.setError(e);
144: cft.setMessageContext(faultMessageContext);
145: } else {
146: cft.setError(e);
147: }
148:
149: execute();
150: }
151:
152: private void execute() {
153: if (log.isDebugEnabled()) {
154: log
155: .debug("Executor task starting to process async response");
156: }
157:
158: if (executor != null) {
159: if (task != null && !task.isCancelled()) {
160: try {
161: executor.execute(task);
162: } catch (Exception executorExc) {
163: if (log.isDebugEnabled()) {
164: log
165: .debug("CallbackFuture.execute(): executor exception ["
166: + executorExc.getClass()
167: .getName() + "]");
168: }
169:
170: // attempt to cancel the FutureTask
171: task.cancel(true);
172:
173: // note: if it is becomes required to return the actual exception
174: // to the client, then we would need to doing something
175: // similar to setting the CallbackFutureTask with the error
176: // and invoking the CallbackFutureTask.call() interface
177: // to process the information
178: //
179: }
180:
181: if (log.isDebugEnabled()) {
182: log.debug("Task submitted to Executor");
183: }
184: } else {
185: if (log.isDebugEnabled()) {
186: log
187: .info("Executor task was not sumbitted as Async Future task was cancelled by clients");
188: }
189: }
190: }
191:
192: if (log.isDebugEnabled()) {
193: log.debug("Executor task completed");
194: }
195: }
196: }
197:
198: class CallbackFutureTask implements Callable {
199:
200: private static final Log log = LogFactory
201: .getLog(CallbackFutureTask.class);
202: private static final boolean debug = log.isDebugEnabled();
203:
204: AsyncResponse response;
205: MessageContext msgCtx;
206: AsyncHandler handler;
207: Exception error;
208:
209: CallbackFutureTask(AsyncResponse r, AsyncHandler h) {
210: response = r;
211: handler = h;
212: }
213:
214: void setMessageContext(MessageContext mc) {
215: msgCtx = mc;
216: }
217:
218: void setError(Exception e) {
219: error = e;
220: }
221:
222: @SuppressWarnings("unchecked")
223: public Object call() throws Exception {
224: // Set the response or fault content on the AsyncResponse object
225: // so that it can be collected inside the Executor thread and processed.
226: if (error != null) {
227: response.onError(error, msgCtx);
228: } else {
229: response.onComplete(msgCtx);
230: }
231:
232: // Now that the content is available, call the JAX-WS AsyncHandler class
233: // to deliver the response to the user.
234: try {
235: ClassLoader cl = handler.getClass().getClassLoader();
236: if (log.isDebugEnabled()) {
237: log.debug("Setting up the thread's ClassLoader");
238: log.debug(cl.toString());
239: }
240: Thread.currentThread().setContextClassLoader(cl);
241:
242: if (debug) {
243: log
244: .debug("Calling JAX-WS AsyncHandler with the Response object");
245: log.debug("AyncHandler class: " + handler.getClass());
246: }
247: handler.handleResponse(response);
248: } catch (Throwable t) {
249: if (debug) {
250: log
251: .debug("An error occured while invoking the callback object.");
252: log.debug("Error: " + t.getMessage());
253: }
254: }
255:
256: return null;
257: }
258: }
|