001: package org.objectweb.celtix.bus.transports.http;
002:
003: import java.io.IOException;
004: import java.io.InputStream;
005: import java.net.URL;
006: import java.util.concurrent.Callable;
007: import java.util.concurrent.Executor;
008: import java.util.concurrent.ExecutorService;
009: import java.util.concurrent.Executors;
010: import java.util.concurrent.Future;
011: import java.util.concurrent.TimeUnit;
012: import java.util.concurrent.TimeoutException;
013: import java.util.concurrent.locks.Condition;
014: import java.util.concurrent.locks.Lock;
015: import java.util.concurrent.locks.ReentrantLock;
016:
017: import javax.wsdl.WSDLException;
018: import javax.xml.namespace.QName;
019:
020: import junit.extensions.TestSetup;
021: import junit.framework.Test;
022: import junit.framework.TestCase;
023: import junit.framework.TestSuite;
024:
025: import org.easymock.classextension.EasyMock;
026: import org.objectweb.celtix.Bus;
027: import org.objectweb.celtix.BusException;
028: import org.objectweb.celtix.bindings.ClientBinding;
029: import org.objectweb.celtix.bus.busimpl.ComponentCreatedEvent;
030: import org.objectweb.celtix.bus.busimpl.ComponentRemovedEvent;
031: import org.objectweb.celtix.bus.configuration.ConfigurationEventFilter;
032: import org.objectweb.celtix.bus.transports.TestResponseCallback;
033: import org.objectweb.celtix.bus.transports.TransportFactoryManagerImpl;
034: import org.objectweb.celtix.bus.workqueue.WorkQueueManagerImpl;
035: import org.objectweb.celtix.bus.wsdl.WSDLManagerImpl;
036: import org.objectweb.celtix.buslifecycle.BusLifeCycleManager;
037: import org.objectweb.celtix.configuration.Configuration;
038: import org.objectweb.celtix.configuration.types.ClassNamespaceMappingListType;
039: import org.objectweb.celtix.configuration.types.ClassNamespaceMappingType;
040: import org.objectweb.celtix.configuration.types.ObjectFactory;
041: import org.objectweb.celtix.context.GenericMessageContext;
042: import org.objectweb.celtix.context.InputStreamMessageContext;
043: import org.objectweb.celtix.context.OutputStreamMessageContext;
044: import org.objectweb.celtix.transports.ClientTransport;
045: import org.objectweb.celtix.transports.ServerTransport;
046: import org.objectweb.celtix.transports.ServerTransportCallback;
047: import org.objectweb.celtix.transports.TransportFactoryManager;
048: import org.objectweb.celtix.ws.addressing.EndpointReferenceType;
049: import org.objectweb.celtix.wsdl.EndpointReferenceUtils;
050: import org.objectweb.celtix.wsdl.WSDLManager;
051: import static org.easymock.EasyMock.isA;
052:
053: public class HTTPTransportTest extends TestCase {
054:
055: private static final QName SERVICE_NAME = new QName(
056: "http://objectweb.org/hello_world_soap_http", "SOAPService");
057: private static final String PORT_NAME = "SoapPort";
058: private static final String ADDRESS = "http://localhost:9000/SoapContext/SoapPort";
059: private static final String DECOUPLED_ADDRESS = "http://localhost:9999/decoupled";
060: private static final int DECOUPLED_PORT = 9999;
061:
062: private static final URL WSDL_URL = HTTPTransportTest.class
063: .getResource("/wsdl/hello_world.wsdl");
064:
065: private static boolean first = true;
066:
067: Bus bus;
068: private WSDLManager wsdlManager;
069: private WorkQueueManagerImpl queueManager;
070: private ExecutorService executorService;
071: private TestResponseCallback responseCallback;
072: private HTTPTransportFactory factory;
073: private Lock partialResponseReceivedLock;
074: private Condition partialResponseReceivedCondition;
075: private boolean partialResponseReceivedNotified;
076: private ClientBinding clientBinding;
077:
078: public HTTPTransportTest(String arg0) {
079: super (arg0);
080: }
081:
082: public static Test suite() throws Exception {
083: TestSuite suite = new TestSuite(HTTPTransportTest.class);
084: return new TestSetup(suite) {
085: protected void tearDown() throws Exception {
086: super .tearDown();
087: JettyHTTPServerEngine.destroyForPort(9000);
088: }
089: };
090: }
091:
092: public static void main(String[] args) {
093: junit.textui.TestRunner.run(HTTPTransportTest.class);
094: }
095:
096: public void setUp() throws BusException {
097: bus = EasyMock.createMock(Bus.class);
098: wsdlManager = new WSDLManagerImpl(null);
099: partialResponseReceivedLock = new ReentrantLock();
100: partialResponseReceivedCondition = partialResponseReceivedLock
101: .newCondition();
102: partialResponseReceivedNotified = false;
103: responseCallback = new TestResponseCallback();
104: clientBinding = EasyMock.createMock(ClientBinding.class);
105: }
106:
107: public void tearDown() throws Exception {
108: EasyMock.reset(bus);
109: try {
110: bus.removeListener(isA(JettyHTTPServerTransport.class));
111: } catch (BusException e) {
112: // TODO nothing to do
113: }
114: EasyMock.expectLastCall();
115: checkBusRemovedEvent();
116: EasyMock.replay(bus);
117:
118: if (queueManager != null) {
119: queueManager.shutdown(false);
120: }
121: if (executorService != null) {
122: executorService.shutdownNow();
123: }
124: JettyHTTPServerEngine.destroyForPort(DECOUPLED_PORT);
125: }
126:
127: int readBytes(byte bytes[], InputStream ins) throws IOException {
128: int len = ins.read(bytes);
129: int total = 0;
130: while (len != -1) {
131: total += len;
132: len = ins.read(bytes, total, bytes.length - total);
133: }
134: return total;
135: }
136:
137: public void testInvokeOneway() throws Exception {
138: doTestInvokeOneway(false);
139: }
140:
141: public void testInvokeOnewayDecoupled() throws Exception {
142: doTestInvokeOneway(true);
143: }
144:
145: public void testInvoke() throws Exception {
146: doTestInvoke(false);
147: doTestInvoke(false);
148: }
149:
150: public void testInvokeDecoupled() throws Exception {
151: doTestInvoke(false, true, ADDRESS);
152: }
153:
154: public void testInvokeUsingAutomaticWorkQueue() throws Exception {
155: doTestInvoke(true);
156: }
157:
158: public void testInvokeDecoupledUsingAutomaticWorkQueue()
159: throws Exception {
160: doTestInvoke(true, true, ADDRESS);
161: }
162:
163: public void testInvokeAsync() throws Exception {
164: doTestInvokeAsync(false);
165: }
166:
167: public void testInvokeAsyncDecoupled() throws Exception {
168: doTestInvokeAsync(false, true);
169: }
170:
171: public void testInvokeAsyncUsingAutomaticWorkQueue()
172: throws Exception {
173: doTestInvokeAsync(true);
174: }
175:
176: public void testInvokeAsyncDecoupledUsingAutomaticWorkQueue()
177: throws Exception {
178: doTestInvokeAsync(true, true);
179: }
180:
181: public void testInputStreamMessageContextCallable()
182: throws Exception {
183: factory = createTransportFactory();
184: HTTPClientTransport.HTTPClientOutputStreamContext octx = EasyMock
185: .createMock(HTTPClientTransport.HTTPClientOutputStreamContext.class);
186: HTTPClientTransport.HTTPClientInputStreamContext ictx = EasyMock
187: .createMock(HTTPClientTransport.HTTPClientInputStreamContext.class);
188: octx.getCorrespondingInputStreamContext();
189: EasyMock.expectLastCall().andReturn(ictx);
190: EasyMock.replay(octx);
191: HTTPClientTransport client = (HTTPClientTransport) createClientTransport(
192: WSDL_URL, SERVICE_NAME, PORT_NAME, ADDRESS, false);
193:
194: Callable c = client.getInputStreamMessageContextCallable(octx);
195: assertNotNull(c);
196: InputStreamMessageContext result = (InputStreamMessageContext) c
197: .call();
198: assertEquals(result, ictx);
199: }
200:
201: public void doTestInvokeOneway(boolean decoupled) throws Exception {
202:
203: factory = createTransportFactory();
204:
205: ServerTransport server = createServerTransport(WSDL_URL,
206: SERVICE_NAME, PORT_NAME, ADDRESS);
207: byte[] buffer = new byte[64];
208: activateServer(server, false, 200, buffer, true, decoupled);
209:
210: ClientTransport client = createClientTransport(WSDL_URL,
211: SERVICE_NAME, PORT_NAME, ADDRESS, decoupled);
212: byte outBytes[] = "Hello World!!!".getBytes();
213:
214: long start = System.currentTimeMillis();
215: OutputStreamMessageContext octx = doRequest(client, outBytes,
216: true, decoupled);
217: client.invokeOneway(octx);
218: long stop = System.currentTimeMillis();
219:
220: octx = doRequest(client, outBytes, false, decoupled);
221: client.invokeOneway(octx);
222: octx = doRequest(client, outBytes, false, decoupled);
223: client.invokeOneway(octx);
224: long stop2 = System.currentTimeMillis();
225:
226: server.deactivate();
227: EasyMock.reset(bus);
228: checkBusRemovedEvent();
229: EasyMock.replay(bus);
230: client.shutdown();
231:
232: assertTrue("Total one call: " + (stop - start),
233: (stop - start) < 400);
234: assertTrue("Total: " + (stop2 - start), (stop2 - start) < 600);
235: assertEquals(new String(outBytes), new String(buffer, 0,
236: outBytes.length));
237: Thread.sleep(200);
238: }
239:
240: public void doTestInvoke(final boolean useAutomaticWorkQueue)
241: throws Exception {
242: doTestInvoke(useAutomaticWorkQueue, false, ADDRESS);
243: }
244:
245: public void doTestInvoke(final boolean useAutomaticWorkQueue,
246: final boolean decoupled, final String address)
247: throws Exception {
248:
249: factory = createTransportFactory();
250:
251: ServerTransport server = createServerTransport(WSDL_URL,
252: SERVICE_NAME, PORT_NAME, address);
253:
254: activateServer(server, useAutomaticWorkQueue, 0, null, false,
255: decoupled);
256: //short request
257: ClientTransport client = createClientTransport(WSDL_URL,
258: SERVICE_NAME, PORT_NAME, address, decoupled);
259: doRequestResponse(client, "Hello World".getBytes(), true,
260: decoupled);
261:
262: //long request
263: byte outBytes[] = new byte[5000];
264: for (int x = 0; x < outBytes.length; x++) {
265: outBytes[x] = (byte) ('a' + (x % 26));
266: }
267: client = createClientTransport(WSDL_URL, SERVICE_NAME,
268: PORT_NAME, address, decoupled);
269: doRequestResponse(client, outBytes, false, decoupled);
270:
271: server.deactivate();
272: outBytes = "HelloWorld".getBytes();
273:
274: try {
275: OutputStreamMessageContext octx = client
276: .createOutputStreamContext(new GenericMessageContext());
277: client.finalPrepareOutputStreamContext(octx);
278: octx.getOutputStream().write(outBytes);
279: octx.getOutputStream().close();
280: InputStreamMessageContext ictx = client.invoke(octx);
281: byte bytes[] = new byte[10000];
282: int len = ictx.getInputStream().read(bytes);
283: if (len != -1
284: && new String(bytes, 0, len)
285: .indexOf("HTTP Status 503") == -1
286: && new String(bytes, 0, len).indexOf("Error 404") == -1) {
287: fail("was able to process a message after the servant was deactivated: "
288: + len + " - " + new String(bytes));
289: }
290: } catch (IOException ex) {
291: //ignore - this is what we want
292: }
293: activateServer(server, useAutomaticWorkQueue, 0, null, false,
294: decoupled);
295: doRequestResponse(client, "Hello World 3".getBytes(), false,
296: decoupled);
297: server.deactivate();
298: activateServer(server, useAutomaticWorkQueue, 0, null, false,
299: decoupled);
300: doRequestResponse(client, "Hello World 4".getBytes(), false,
301: decoupled);
302: server.deactivate();
303: EasyMock.reset(bus);
304: checkBusRemovedEvent();
305: EasyMock.replay(bus);
306: client.shutdown();
307: }
308:
309: public void doTestInvokeAsync(final boolean useAutomaticWorkQueue)
310: throws Exception {
311: doTestInvokeAsync(useAutomaticWorkQueue, false);
312: }
313:
314: public void doTestInvokeAsync(final boolean useAutomaticWorkQueue,
315: boolean decoupled) throws Exception {
316:
317: Executor executor = null;
318: if (useAutomaticWorkQueue) {
319: queueManager = new WorkQueueManagerImpl(bus);
320: executor = queueManager.getAutomaticWorkQueue();
321: } else {
322: executorService = Executors.newFixedThreadPool(1);
323: executor = executorService;
324: }
325: factory = createTransportFactory();
326:
327: ServerTransport server = createServerTransport(WSDL_URL,
328: SERVICE_NAME, PORT_NAME, ADDRESS);
329: activateServer(server, false, 400, null, false, decoupled);
330:
331: ClientTransport client = createClientTransport(WSDL_URL,
332: SERVICE_NAME, PORT_NAME, ADDRESS, decoupled);
333: byte outBytes[] = "Hello World!!!".getBytes();
334:
335: // wait then read without blocking
336: OutputStreamMessageContext octx = doRequest(client, outBytes,
337: true, decoupled);
338: Future<InputStreamMessageContext> f = client.invokeAsync(octx,
339: executor);
340: assertNotNull(f);
341: int i = 0;
342: while (i < 10) {
343: Thread.sleep(100);
344: if (f.isDone()) {
345: break;
346: }
347: i++;
348: }
349: assertTrue(f.isDone());
350: InputStreamMessageContext ictx = f.get();
351: doResponse(client, ictx, outBytes, decoupled);
352:
353: // blocking read (on new thread)
354: octx = doRequest(client, outBytes, false, decoupled);
355: f = client.invokeAsync(octx, executor);
356: ictx = f.get();
357: assertTrue(f.isDone());
358: doResponse(client, ictx, outBytes, decoupled);
359:
360: // blocking read times out
361: boolean timeoutImplemented = false;
362: if (timeoutImplemented) {
363: octx = doRequest(client, outBytes, false, decoupled);
364: f = client.invokeAsync(octx, executor);
365: try {
366: ictx = f.get(200, TimeUnit.MILLISECONDS);
367: fail("Expected TimeoutException not thrown.");
368: } catch (TimeoutException ex) {
369: // ignore
370: }
371: assertTrue(!f.isDone());
372: }
373: server.deactivate();
374: }
375:
376: public void testInvokeNoContext() throws Exception {
377: boolean oldFirst = first;
378: try {
379: first = true;
380: doTestInvoke(false, false, "http://localhost:9888");
381: } finally {
382: first = oldFirst;
383: JettyHTTPServerEngine.destroyForPort(9888);
384: }
385: }
386:
387: private void checkBusCreatedEvent() {
388:
389: bus.sendEvent(isA(ComponentCreatedEvent.class));
390:
391: EasyMock.expectLastCall();
392: }
393:
394: private void checkBusRemovedEvent() {
395:
396: bus.sendEvent(isA(ComponentRemovedEvent.class));
397:
398: EasyMock.expectLastCall();
399: }
400:
401: private void activateServer(ServerTransport server,
402: final boolean useAutomaticWorkQueue, final int delay,
403: final byte[] buffer, final boolean oneWay,
404: final boolean decoupled) throws Exception {
405: ServerTransportCallback callback = new TestServerTransportCallback(
406: server, useAutomaticWorkQueue, delay, buffer, oneWay,
407: decoupled);
408: EasyMock.reset(bus);
409: Configuration bc = EasyMock.createMock(Configuration.class);
410: bus.getConfiguration();
411: EasyMock.expectLastCall().andReturn(bc);
412: server.activate(callback);
413: }
414:
415: private void doRequestResponse(ClientTransport client,
416: byte outBytes[], boolean initial, boolean decoupled)
417: throws Exception {
418: OutputStreamMessageContext octx = doRequest(client, outBytes,
419: initial, decoupled);
420: InputStreamMessageContext ictx = client.invoke(octx);
421: doResponse(client, ictx, outBytes, decoupled);
422: }
423:
424: private OutputStreamMessageContext doRequest(
425: ClientTransport client, byte outBytes[], boolean initial,
426: boolean decoupled) throws Exception {
427: if (decoupled) {
428: if (initial) {
429: assertFalse(((HTTPClientTransport) client)
430: .hasDecoupledEndpoint());
431: EasyMock.reset(bus);
432: Configuration lc = EasyMock
433: .createMock(Configuration.class);
434: bus.getConfiguration();
435: EasyMock.expectLastCall().andReturn(lc);
436: EasyMock.replay(bus);
437: EasyMock.reset(clientBinding);
438: clientBinding.createResponseCallback();
439: EasyMock.expectLastCall().andReturn(responseCallback);
440: EasyMock.replay(clientBinding);
441: }
442:
443: EndpointReferenceType decoupledEndpoint = client
444: .getDecoupledEndpoint();
445: assertNotNull(decoupledEndpoint);
446: assertNotNull(decoupledEndpoint.getAddress());
447: assertEquals(decoupledEndpoint.getAddress().getValue(),
448: DECOUPLED_ADDRESS);
449: assertTrue(((HTTPClientTransport) client)
450: .hasDecoupledEndpoint());
451: assertSame(responseCallback, client.getResponseCallback());
452:
453: if (initial) {
454: EasyMock.verify(bus);
455: EasyMock.verify(clientBinding);
456: }
457: }
458: OutputStreamMessageContext octx = client
459: .createOutputStreamContext(new GenericMessageContext());
460: client.finalPrepareOutputStreamContext(octx);
461: octx.getOutputStream().write(outBytes);
462: return octx;
463: }
464:
465: private void doResponse(ClientTransport client,
466: InputStreamMessageContext ictx, byte outBytes[],
467: boolean decoupled) throws Exception {
468: if (decoupled) {
469: signalPartialResponseReceived();
470: doResponse(client, responseCallback.waitForNextResponse(),
471: outBytes);
472: } else {
473: doResponse(client, ictx, outBytes);
474: }
475: }
476:
477: private void doResponse(ClientTransport client,
478: InputStreamMessageContext ictx, byte outBytes[])
479: throws Exception {
480: byte bytes[] = new byte[10000];
481: int len = readBytes(bytes, ictx.getInputStream());
482: assertTrue("Did not read anything " + len, len > 0);
483: assertEquals(new String(outBytes), new String(bytes, 0, len));
484: }
485:
486: private void awaitPartialResponseReceived() throws Exception {
487: partialResponseReceivedLock.lock();
488: try {
489: while (!partialResponseReceivedNotified) {
490: partialResponseReceivedCondition.await();
491: }
492: } finally {
493: partialResponseReceivedNotified = false;
494: partialResponseReceivedLock.unlock();
495: }
496: }
497:
498: private void signalPartialResponseReceived() throws Exception {
499: partialResponseReceivedLock.lock();
500: try {
501: partialResponseReceivedNotified = true;
502: partialResponseReceivedCondition.signal();
503: } finally {
504: partialResponseReceivedLock.unlock();
505: }
506: }
507:
508: private HTTPTransportFactory createTransportFactory()
509: throws BusException {
510: EasyMock.reset(bus);
511: Configuration bc = EasyMock.createMock(Configuration.class);
512:
513: String transportId = "http://celtix.objectweb.org/transports/http/configuration";
514: ObjectFactory of = new ObjectFactory();
515: ClassNamespaceMappingListType mappings = of
516: .createClassNamespaceMappingListType();
517: ClassNamespaceMappingType mapping = of
518: .createClassNamespaceMappingType();
519: mapping
520: .setClassname("org.objectweb.celtix.bus.transports.http.HTTPTransportFactory");
521: mapping.getNamespace().add(transportId);
522: mappings.getMap().add(mapping);
523:
524: bus.getWSDLManager();
525: EasyMock.expectLastCall().andReturn(wsdlManager);
526: bus.getWSDLManager();
527: EasyMock.expectLastCall().andReturn(wsdlManager);
528: bus.getWSDLManager();
529: EasyMock.expectLastCall().andReturn(wsdlManager);
530:
531: BusLifeCycleManager lifecycleManager = EasyMock
532: .createNiceMock(BusLifeCycleManager.class);
533: bus.getLifeCycleManager();
534: EasyMock.expectLastCall().andReturn(lifecycleManager);
535: bus.getConfiguration();
536: EasyMock.expectLastCall().andReturn(bc);
537: bc.getObject("transportFactories");
538: EasyMock.expectLastCall().andReturn(mappings);
539: // check the transportFactoryManager create event
540: checkBusCreatedEvent();
541: EasyMock.replay(bus);
542: EasyMock.replay(bc);
543:
544: TransportFactoryManager tfm = new TransportFactoryManagerImpl(
545: bus);
546: return (HTTPTransportFactory) tfm
547: .getTransportFactory(transportId);
548: }
549:
550: private ClientTransport createClientTransport(URL wsdlUrl,
551: QName serviceName, String portName, String address,
552: boolean decoupled) throws WSDLException, IOException {
553: EasyMock.reset(bus);
554:
555: Configuration bc = EasyMock.createMock(Configuration.class);
556: Configuration pc = EasyMock.createMock(Configuration.class);
557:
558: bus.getConfiguration();
559: EasyMock.expectLastCall().andReturn(bc);
560: String id = serviceName.toString() + "/" + portName;
561: bc
562: .getChild(
563: "http://celtix.objectweb.org/bus/jaxws/port-config",
564: id);
565: EasyMock.expectLastCall().andReturn(pc);
566: pc
567: .getChild(
568: "http://celtix.objectweb.org/bus/transports/http/http-client-config",
569: "http-client");
570: EasyMock.expectLastCall().andReturn(null);
571: bus.getWSDLManager();
572: EasyMock.expectLastCall().andReturn(wsdlManager);
573: pc.getString("address");
574: EasyMock.expectLastCall().andReturn(address);
575:
576: checkBusCreatedEvent();
577:
578: EasyMock.replay(bus);
579: EasyMock.replay(bc);
580: EasyMock.replay(pc);
581:
582: EndpointReferenceType ref = EndpointReferenceUtils
583: .getEndpointReference(wsdlUrl, serviceName, portName);
584: ClientTransport transport = factory.createClientTransport(ref,
585: clientBinding);
586: if (decoupled) {
587: ((HTTPClientTransport) transport).policy
588: .setDecoupledEndpoint(DECOUPLED_ADDRESS);
589: }
590:
591: EasyMock.verify(bus);
592: EasyMock.verify(bc);
593: EasyMock.verify(pc);
594: return transport;
595:
596: }
597:
598: private ServerTransport createServerTransport(URL wsdlUrl,
599: QName serviceName, String portName, String address)
600: throws WSDLException, IOException {
601:
602: URL url = new URL(address);
603:
604: EasyMock.reset(bus);
605:
606: Configuration bc = EasyMock.createMock(Configuration.class);
607: Configuration ec = EasyMock.createMock(Configuration.class);
608:
609: bus.getConfiguration();
610: EasyMock.expectLastCall().andReturn(bc);
611: bc
612: .getChild(
613: "http://celtix.objectweb.org/bus/jaxws/endpoint-config",
614: serviceName.toString());
615: EasyMock.expectLastCall().andReturn(ec);
616: ec
617: .getChild(
618: "http://celtix.objectweb.org/bus/transports/http/http-server-config",
619: "http-server");
620: EasyMock.expectLastCall().andReturn(null);
621: bus.getWSDLManager();
622: EasyMock.expectLastCall().andReturn(wsdlManager);
623: if (first) {
624: //first call will configure the port listener
625: bus.getConfiguration();
626: EasyMock.expectLastCall().andReturn(bc);
627: bc
628: .getChild(
629: "http://celtix.objectweb.org/bus/transports/http/http-listener-config",
630: "http-listener." + url.getPort());
631: EasyMock.expectLastCall().andReturn(null);
632: first = false;
633: }
634:
635: try {
636: bus.addListener(isA(JettyHTTPServerTransport.class),
637: isA(ConfigurationEventFilter.class));
638: } catch (BusException e) {
639: // TODO nothing to do
640: }
641: EasyMock.expectLastCall();
642:
643: checkBusCreatedEvent();
644:
645: EasyMock.replay(bus);
646: EasyMock.replay(bc);
647: EasyMock.replay(ec);
648:
649: EndpointReferenceType ref = EndpointReferenceUtils
650: .getEndpointReference(wsdlUrl, serviceName, portName);
651: EndpointReferenceUtils.setAddress(ref, address);
652: ServerTransport transport = factory.createServerTransport(ref);
653:
654: EasyMock.verify(bus);
655: EasyMock.verify(bc);
656: EasyMock.verify(ec);
657:
658: return transport;
659:
660: }
661:
662: private class TestServerTransportCallback implements
663: ServerTransportCallback {
664: private ServerTransport server;
665: private boolean useAutomaticWorkQueue;
666: private int delay;
667: private byte[] buffer;
668: private boolean oneWay;
669: private boolean decoupled;
670:
671: TestServerTransportCallback(ServerTransport s, boolean uaq,
672: int d, byte[] b, boolean ow, boolean dc) {
673: server = s;
674: useAutomaticWorkQueue = uaq;
675: delay = d;
676: buffer = b;
677: oneWay = ow;
678: decoupled = dc;
679: }
680:
681: public void dispatch(InputStreamMessageContext ctx,
682: ServerTransport transport) {
683: try {
684: byte[] bytes = buffer;
685: if (null == bytes) {
686: bytes = new byte[10000];
687: }
688: int total = readBytes(bytes, ctx.getInputStream());
689:
690: OutputStreamMessageContext octx = null;
691: if (decoupled) {
692: EndpointReferenceType ref = new EndpointReferenceType();
693: EndpointReferenceUtils.setAddress(ref,
694: DECOUPLED_ADDRESS);
695: octx = server.rebase(ctx, ref);
696: server.finalPrepareOutputStreamContext(octx);
697: octx.getOutputStream().flush();
698: octx.getOutputStream().close();
699: assertEquals(
700: ctx
701: .get(HTTPServerInputStreamContext.HTTP_RESPONSE),
702: ref);
703: if (!oneWay) {
704: awaitPartialResponseReceived();
705: }
706: }
707:
708: if (oneWay) {
709: octx = transport.createOutputStreamContext(ctx);
710: octx.setOneWay(oneWay);
711: transport.finalPrepareOutputStreamContext(octx);
712: octx.getOutputStream().close();
713: transport.postDispatch(ctx, octx);
714: }
715:
716: // simulate implementor call
717: if (delay > 0) {
718: Thread.sleep(delay);
719: }
720:
721: if (!oneWay) {
722: octx = transport.createOutputStreamContext(ctx);
723: octx.setOneWay(oneWay);
724: transport.finalPrepareOutputStreamContext(octx);
725: octx.getOutputStream().write(bytes, 0, total);
726: octx.getOutputStream().flush();
727: octx.getOutputStream().close();
728: transport.postDispatch(ctx, octx);
729: }
730: } catch (Exception ex) {
731: ex.printStackTrace();
732: }
733: }
734:
735: public synchronized Executor getExecutor() {
736: EasyMock.reset(bus);
737: checkBusCreatedEvent();
738: EasyMock.replay(bus);
739: if (useAutomaticWorkQueue) {
740: if (queueManager == null) {
741: queueManager = new WorkQueueManagerImpl(bus);
742: }
743: return queueManager.getAutomaticWorkQueue();
744: } else {
745: return null;
746: }
747: }
748: }
749: }
|