001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.axis2.jaxws.sample;
020:
021: import java.util.Map;
022: import java.util.concurrent.ExecutionException;
023: import java.util.concurrent.Executor;
024: import java.util.concurrent.ExecutorService;
025: import java.util.concurrent.Executors;
026: import java.util.concurrent.Future;
027: import java.util.concurrent.RejectedExecutionException;
028:
029: import javax.xml.ws.BindingProvider;
030: import javax.xml.ws.Response;
031:
032: import junit.framework.Test;
033: import junit.framework.TestCase;
034: import junit.framework.TestSuite;
035: import org.apache.axis2.jaxws.sample.parallelasync.common.CallbackHandler;
036: import org.apache.axis2.jaxws.sample.parallelasync.server.AsyncPort;
037: import org.apache.axis2.jaxws.sample.parallelasync.server.AsyncService;
038: import org.apache.axis2.jaxws.TestLogger;
039: import org.apache.log4j.BasicConfigurator;
040: import org.test.parallelasync.CustomAsyncResponse;
041: import org.test.parallelasync.SleepResponse;
042: import org.test.parallelasync.WakeUpResponse;
043:
044: /**
045: * Tests for Asynchrony in JAX-WS. Most of the simple invokeAsync/async
046: * exceptions have been covered under jaxws.dispatch and jaxws.proxy test suites
047: *
048: * ExecutionException tests are covered in jaxws.dispatch and jaxws.proxy
049: */
050: public class ParallelAsyncTests extends TestCase {
051:
052: private static final String DOCLITWR_ASYNC_ENDPOINT = "http://localhost:8080/axis2/services/AsyncService";
053:
054: // used for logging
055: private String myClassName = "ParallelAsyncTests";
056:
057: public ParallelAsyncTests(String str) {
058: super (str);
059: }
060:
061: public static Test suite() {
062: TestSuite suite = new TestSuite(ParallelAsyncTests.class);
063: return suite;
064:
065: }
066:
067: public void setUp() {
068: TestLogger.logger.debug("==================== " + getName());
069: }
070:
071: public void testNOOP() {
072: }
073:
074: /**
075: * @testStrategy Check that the web service is up and running
076: * before running any other tests
077: */
078: public void testService_isAlive() throws Exception {
079: final String MESSAGE = "testServiceAlive";
080:
081: String title = myClassName + " : " + getName() + " : ";
082:
083: AsyncPort port = getPort((Executor) null);
084:
085: String req1base = "sleepAsync";
086: String req2base = "remappedAsync";
087:
088: String request1 = null;
089: String request2 = null;
090:
091: for (int i = 0; i < 10; i++) {
092:
093: request1 = req1base + "_" + i;
094: request2 = req2base + "_" + i;
095:
096: TestLogger.logger.debug(title + "iteration [" + i
097: + "] using request1 [" + request1 + "] request2 ["
098: + request2 + "]");
099:
100: // submit request #1 to the server-side web service that
101: // the web service will keep until we ask for it
102: Response<SleepResponse> resp1 = port.sleepAsync(request1);
103:
104: // submit request #2 to the server that essentially processes
105: // without delay
106: Response<CustomAsyncResponse> resp2 = port
107: .remappedAsync(request2);
108:
109: // wait until the response for request #2 is done
110: waitBlocking(resp2);
111:
112: // check the waiting request #1
113: String asleep = port.isAsleep(request1);
114: //System.out.println(title+"iteration ["+i+"] port.isAsleep(request1 ["+request1+"]) = ["+asleep+"]");
115:
116: // wakeup the waiting request #1
117: String wake = port.wakeUp(request1);
118: //System.out.println(title+"iteration ["+i+"] port.wakeUp(request1 ["+request1+"]) = ["+wake+"]");
119:
120: // wait until the response for request #1 is done
121: waitBlocking(resp1);
122:
123: // get the responses
124: String req1_result = null;
125: String req2_result = null;
126:
127: try {
128: req1_result = resp1.get().getMessage();
129: req2_result = resp2.get().getResponse();
130: } catch (Exception e) {
131: TestLogger.logger.debug(title + "iteration [" + i
132: + "] using request1 [" + request1
133: + "] request2 [" + request2
134: + "] : got exception ["
135: + e.getClass().getName() + "] ["
136: + e.getMessage() + "] ");
137: e.printStackTrace();
138: fail(e.toString());
139: }
140:
141: // check status on request #1
142: assertEquals("sleepAsync did not sleep as expected",
143: request1, asleep);
144: assertEquals(
145: "sleepAsync did not return expected response ",
146: request1, req1_result);
147:
148: // check status on request #2
149: assertEquals(
150: "remappedAsync did not return expected response",
151: request2, req2_result);
152:
153: // Calling get() again should return the same object as the first call to get()
154: assertEquals(
155: "sleepAsync did not return expected response ",
156: request1, resp1.get().getMessage());
157: assertEquals(
158: "remappedAsync did not return expected response",
159: request2, resp2.get().getResponse());
160:
161: }
162:
163: // check the callback operation
164: CallbackHandler<SleepResponse> sleepCallbackHandler = new CallbackHandler<SleepResponse>();
165:
166: request1 = req1base + "_with_Callback";
167: //System.out.println(title+" port.sleepAsync("+request1+", callbackHander) being submitted....");
168: Future<?> sr = port.sleepAsync(request1, sleepCallbackHandler);
169:
170: // wait a bit for the server to process the request ...
171: Thread.sleep(500);
172:
173: // check the waiting request
174: String asleepWithCallback = port.isAsleep(request1);
175: //System.out.println(title+" port.isAsleep("+request1+") = ["+asleepWithCallback+"]");
176:
177: // wakeup the waiting request
178: String wake = port.wakeUp(request1);
179: //System.out.println(title+" port.wakeUp("+request1+") = ["+wake+"]");
180:
181: // wait a bit..
182: Thread.sleep(500);
183:
184: // get the response
185: String req_cb_result = null;
186:
187: try {
188:
189: SleepResponse sleepResp = sleepCallbackHandler.get();
190:
191: if (sleepResp != null) {
192: req_cb_result = sleepResp.getMessage();
193: TestLogger.logger.debug(title + " request [" + request1
194: + "] : result [" + req_cb_result + "] ");
195: }
196:
197: } catch (Exception ex) {
198: TestLogger.logger.debug(title + " request [" + request1
199: + "] : got exception [" + ex.getClass().getName()
200: + "] [" + ex.getMessage() + "] ");
201: ex.printStackTrace();
202: fail(ex.toString());
203: }
204:
205: // check status on request
206: assertEquals(
207: "sleepAsync with callback did not sleep as expected",
208: request1, req_cb_result);
209:
210: }
211:
212: /**
213: * @testStrategy Test for ordering an executor to shutdownNow while there
214: * is a request being processed. Uses the default executor.
215: *
216: */
217: public void testService_ExecutorShutdownNow() throws Exception {
218: final String MESSAGE = "testExecutorShutdownNow";
219:
220: String title = myClassName + " : " + getName() + " : ";
221:
222: AsyncService service = getService(null);
223: AsyncPort port = getPort(service);
224:
225: // get the default executor and check to make sure it is an executor service
226: ExecutorService ex = null;
227: Executor executor = service.getExecutor();
228: if ((executor != null) && (executor instanceof ExecutorService)) {
229: ex = (ExecutorService) executor;
230: } else {
231: TestLogger.logger
232: .debug(title
233: + " No executor service available. Nothing to test.");
234: return;
235: }
236:
237: // submit a request to the server that will wait until we ask for it
238: CallbackHandler<SleepResponse> sleepCallbackHandler1 = new CallbackHandler<SleepResponse>();
239:
240: String request1 = "sleepAsync_with_Callback_1";
241:
242: TestLogger.logger.debug(title + " port.sleepAsync(" + request1
243: + ", callbackHander1) #1 being submitted....");
244: Future<?> sr1 = port
245: .sleepAsync(request1, sleepCallbackHandler1);
246: TestLogger.logger.debug(title + " port.sleepAsync(" + request1
247: + ", callbackHander1) #1 .....submitted.");
248:
249: // wait a bit to make sure that the server has the request
250: Thread.sleep(1000);
251:
252: // tell the executor to shutdown immediately, which
253: // attempts to stop all actively executing tasks via Thread.interrupt()
254: // and should prevent new tasks from being submitted
255: TestLogger.logger.debug(title + " shutting down executor ["
256: + ex.getClass().getName() + "]");
257: ex.shutdownNow();
258:
259: // check the waiting request
260: TestLogger.logger.debug(title + " port.isAsleep(" + request1
261: + ") #1 being submitted....");
262: String asleepWithCallback1 = port.isAsleep(request1);
263: TestLogger.logger.debug(title + " port.isAsleep(" + request1
264: + ") #1 = [" + asleepWithCallback1 + "]");
265:
266: // wakeup the waiting request
267: TestLogger.logger.debug(title
268: + " port.wakeUp(request1) #1 being submitted....");
269: String wake1 = port.wakeUp(request1);
270: TestLogger.logger.debug(title + " port.wakeUp(" + request1
271: + ") #1 = [" + wake1 + "]");
272:
273: // wait a bit..
274: Thread.sleep(2000);
275:
276: // check the Future
277: if (sr1.isDone()) {
278: TestLogger.logger.debug(title + " sr1.isDone[TRUE] ");
279: }
280:
281: // try to get the response
282: boolean gotException = false;
283: try {
284:
285: SleepResponse sleepResp1 = sleepCallbackHandler1.get();
286:
287: if (sleepResp1 != null) {
288: TestLogger.logger
289: .debug(title
290: + " request ["
291: + request1
292: + "] #1: sleepResponse [NOT NULL] from callback handler");
293: String result1 = sleepResp1.getMessage();
294: TestLogger.logger.debug(title + " request [" + request1
295: + "] #1: result [" + result1 + "] ");
296: } else {
297: TestLogger.logger
298: .debug(title
299: + " request ["
300: + request1
301: + "] #1: sleepResponse [NULL] from callback handler");
302:
303: // see what the Future says
304: TestLogger.logger.debug(title + " request [" + request1
305: + "] #1: ....check Future response...");
306: Object futureResult = sr1.get();
307: TestLogger.logger.debug(title + " request [" + request1
308: + "] #1: ....Future response [" + futureResult
309: + "]...");
310: }
311:
312: } catch (Exception exc) {
313:
314: TestLogger.logger.debug(title + " request [" + request1
315: + "] : got exception [" + exc.getClass().getName()
316: + "] [" + exc.getMessage() + "] ");
317: gotException = true;
318: }
319:
320: assertTrue(
321: "Did not receive an exception from trying to access the response when the executor service is shutdown.",
322: gotException);
323: }
324:
325: /**
326: * @testStrategy Test for ordering an executor to shutdownNow while there
327: * is a request being processed. Uses an application executor
328: * service.
329: */
330: public void testService_ExecutorShutdownNow_2() throws Exception {
331: final String MESSAGE = "testExecutorShutdownNow_2";
332:
333: String title = myClassName + " : " + getName() + " : ";
334:
335: AsyncService service = getService(null);
336: AsyncPort port = getPort(service);
337:
338: // get the default executor and check to make sure it is an executor service
339: ExecutorService ex = Executors.newSingleThreadExecutor();
340: service.setExecutor(ex);
341:
342: // submit a request to the server that will wait until we ask for it
343: CallbackHandler<SleepResponse> sleepCallbackHandler1 = new CallbackHandler<SleepResponse>();
344:
345: String request1 = "sleepAsync_with_Callback_1";
346:
347: TestLogger.logger.debug(title + " port.sleepAsync(" + request1
348: + ", callbackHander1) #1 being submitted....");
349: Future<?> sr1 = port
350: .sleepAsync(request1, sleepCallbackHandler1);
351: TestLogger.logger.debug(title + " port.sleepAsync(" + request1
352: + ", callbackHander1) #1 .....submitted.");
353:
354: // wait a bit to make sure that the server has the request
355: Thread.sleep(1000);
356:
357: // tell the executor to shutdown immediately, which
358: // attempts to stop all actively executing tasks via Thread.interrupt()
359: // and should prevent new tasks from being submitted
360: TestLogger.logger.debug(title + " shutting down executor ["
361: + ex.getClass().getName() + "]");
362: ex.shutdownNow();
363:
364: // check the waiting request
365: TestLogger.logger.debug(title + " port.isAsleep(" + request1
366: + ") #1 being submitted....");
367: String asleepWithCallback1 = port.isAsleep(request1);
368: TestLogger.logger.debug(title + " port.isAsleep(" + request1
369: + ") #1 = [" + asleepWithCallback1 + "]");
370:
371: // wakeup the waiting request
372: TestLogger.logger.debug(title
373: + " port.wakeUp(request1) #1 being submitted....");
374: String wake1 = port.wakeUp(request1);
375: TestLogger.logger.debug(title + " port.wakeUp(" + request1
376: + ") #1 = [" + wake1 + "]");
377:
378: // wait a bit..
379: Thread.sleep(2000);
380:
381: // check the Future
382: if (sr1.isDone()) {
383: TestLogger.logger.debug(title + " sr1.isDone[TRUE] ");
384: }
385:
386: // try to get the response
387: boolean gotException = false;
388: try {
389:
390: SleepResponse sleepResp1 = sleepCallbackHandler1.get();
391:
392: if (sleepResp1 != null) {
393: TestLogger.logger
394: .debug(title
395: + " request ["
396: + request1
397: + "] #1: sleepResponse [NOT NULL] from callback handler");
398: String result1 = sleepResp1.getMessage();
399: TestLogger.logger.debug(title + " request [" + request1
400: + "] #1: result [" + result1 + "] ");
401: } else {
402: TestLogger.logger
403: .debug(title
404: + " request ["
405: + request1
406: + "] #1: sleepResponse [NULL] from callback handler");
407:
408: // see what the Future says
409: TestLogger.logger.debug(title + " request [" + request1
410: + "] #1: ....check Future response...");
411: Object futureResult = sr1.get();
412: TestLogger.logger.debug(title + " request [" + request1
413: + "] #1: ....Future response [" + futureResult
414: + "]...");
415: }
416:
417: } catch (Exception exc) {
418:
419: TestLogger.logger.debug(title + " request [" + request1
420: + "] : got exception [" + exc.getClass().getName()
421: + "] [" + exc.getMessage() + "] ");
422: gotException = true;
423: }
424:
425: assertTrue(
426: "Did not receive an exception from trying to access the response when the executor service is shutdown.",
427: gotException);
428: }
429:
430: /**
431: * @testStrategy Test for ordering an executor to shutdownNow before there
432: * is a request. Uses the default executor.
433: *
434: */
435: public void testService_ExecutorShutdownNow_3() throws Exception {
436: final String MESSAGE = "testExecutorShutdownNow_3";
437:
438: String title = myClassName + " : " + getName() + " : ";
439:
440: AsyncService service = getService(null);
441: AsyncPort port = getPort(service);
442:
443: // get the default executor and check to make sure it is an executor service
444: ExecutorService ex = null;
445: Executor executor = service.getExecutor();
446: if ((executor != null) && (executor instanceof ExecutorService)) {
447: ex = (ExecutorService) executor;
448:
449: // tell the executor to shutdown immediately, which
450: // attempts to stop all actively executing tasks via Thread.interrupt()
451: // and should prevent new tasks from being submitted
452: TestLogger.logger.debug(title + " shutting down executor ["
453: + ex.getClass().getName() + "]");
454: ex.shutdownNow();
455: } else {
456: TestLogger.logger
457: .debug(title
458: + " No executor service available. Nothing to test.");
459: return;
460: }
461:
462: boolean gotRequestException = false;
463:
464: String request1 = "sleepAsync_with_Callback_1";
465: CallbackHandler<SleepResponse> sleepCallbackHandler1 = new CallbackHandler<SleepResponse>();
466: Future<?> sr1 = null;
467:
468: try {
469: // submit a request to the server that will wait until we ask for it
470: TestLogger.logger.debug(title + " port.sleepAsync("
471: + request1
472: + ", callbackHander1) #1 being submitted....");
473: sr1 = port.sleepAsync(request1, sleepCallbackHandler1);
474: TestLogger.logger.debug(title + " port.sleepAsync("
475: + request1
476: + ", callbackHander1) #1 .....submitted.");
477: } catch (Exception exc) {
478: TestLogger.logger.debug(title + " request [" + request1
479: + "] : got exception [" + exc.getClass().getName()
480: + "] [" + exc.getMessage() + "] ");
481: gotRequestException = true;
482: }
483:
484: // if the request went through, continue processing to see if the response is stopped
485: // this makes sure that the server doesn't keep the request forever
486: boolean gotResponseException = false;
487:
488: if (!gotRequestException) {
489: // wakeup the waiting request
490: TestLogger.logger.debug(title
491: + " port.wakeUp(request1) #1 being submitted....");
492: String wake1 = port.wakeUp(request1);
493: TestLogger.logger.debug(title + " port.wakeUp(" + request1
494: + ") #1 = [" + wake1 + "]");
495:
496: // try to get the response
497: try {
498:
499: SleepResponse sleepResp1 = sleepCallbackHandler1.get();
500:
501: if (sleepResp1 != null) {
502: TestLogger.logger
503: .debug(title
504: + " request ["
505: + request1
506: + "] #1: sleepResponse [NOT NULL] from callback handler");
507: String result1 = sleepResp1.getMessage();
508: TestLogger.logger.debug(title + " request ["
509: + request1 + "] #1: result [" + result1
510: + "] ");
511: } else {
512: TestLogger.logger
513: .debug(title
514: + " request ["
515: + request1
516: + "] #1: sleepResponse [NULL] from callback handler");
517:
518: // see what the Future says
519: TestLogger.logger.debug(title + " request ["
520: + request1
521: + "] #1: ....check Future response...");
522: Object futureResult = sr1.get();
523: TestLogger.logger.debug(title + " request ["
524: + request1 + "] #1: ....Future response ["
525: + futureResult + "]...");
526: }
527:
528: } catch (Exception exc) {
529:
530: TestLogger.logger.debug(title + " request [" + request1
531: + "] : got exception ["
532: + exc.getClass().getName() + "] ["
533: + exc.getMessage() + "] ");
534: gotResponseException = true;
535: }
536: }
537:
538: assertTrue(
539: "Did not receive an exception from trying to submit the request when the executor service is shutdown.",
540: gotRequestException);
541:
542: //assertTrue("Did not receive an exception from trying to access the response when the executor service is shutdown.",gotResponseException);
543: }
544:
545: /**
546: * Auxiliary method used for doing isAsleep checks. Will perform isAsleep
547: * up to a MAX_ISASLEEP_CHECK number of checks. Will sleep for
548: * SLEEP_ISASLEEP_SEC seconds in between requests. If reaches maximum number
549: * fo retries then will fail the test
550: */
551: private boolean isAsleepCheck(String MESSAGE, AsyncPort port) {
552: boolean asleep = false;
553: int check = 30;
554: String msg = null;
555: do {
556: msg = port.isAsleep(MESSAGE);
557: asleep = (msg != null);
558:
559: // fail the test if we ran out of checks
560: if ((check--) == 0)
561: fail("Serve did not receive sleep after several retries");
562:
563: // sleep for a bit
564: try {
565: Thread.sleep(30);
566: } catch (InterruptedException e) {
567: }
568:
569: } while (!asleep);
570:
571: if (asleep) {
572: assertTrue("Sleeping on an incorrect message", MESSAGE
573: .equals(msg));
574: }
575:
576: return true;
577: }
578:
579: private AsyncService getService(Executor ex) {
580: AsyncService service = new AsyncService();
581:
582: if (ex != null)
583: service.setExecutor(ex);
584:
585: if (service.getExecutor() == null) {
586: TestLogger.logger.debug(myClassName
587: + " : getService() : executor is null");
588: } else {
589: TestLogger.logger.debug(myClassName
590: + " : getService() : executor is available ");
591: }
592:
593: return service;
594: }
595:
596: private AsyncPort getPort(AsyncService service) {
597:
598: AsyncPort port = service.getAsyncPort();
599: assertNotNull("Port is null", port);
600:
601: Map<String, Object> rc = ((BindingProvider) port)
602: .getRequestContext();
603: rc.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
604: DOCLITWR_ASYNC_ENDPOINT);
605:
606: return port;
607:
608: }
609:
610: /**
611: * Auxiliary method used for obtaining a proxy pre-configured with a
612: * specific Executor
613: */
614: private AsyncPort getPort(Executor ex) {
615: AsyncService service = getService(ex);
616:
617: AsyncPort port = service.getAsyncPort();
618: assertNotNull("Port is null", port);
619:
620: Map<String, Object> rc = ((BindingProvider) port)
621: .getRequestContext();
622: rc.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
623: DOCLITWR_ASYNC_ENDPOINT);
624:
625: return port;
626: }
627:
628: private void waitBlocking(Future<?> monitor) {
629: while (!monitor.isDone()) {
630: try {
631: Thread.sleep(1000);
632: } catch (InterruptedException e) {
633: }
634: }
635: }
636: }
|