001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)TestDeliveryChannelImpl.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.messaging;
030:
031: import java.net.URI;
032:
033: import javax.jbi.component.Component;
034: import javax.jbi.component.ComponentContext;
035:
036: import javax.jbi.messaging.ExchangeStatus;
037: import javax.jbi.messaging.InOnly;
038: import javax.jbi.messaging.InOptionalOut;
039: import javax.jbi.messaging.InOut;
040: import javax.jbi.messaging.MessageExchange;
041: import javax.jbi.messaging.MessageExchangeFactory;
042: import javax.jbi.messaging.RobustInOnly;
043:
044: import javax.jbi.servicedesc.ServiceEndpoint;
045:
046: import javax.xml.namespace.QName;
047:
048: import javax.management.openmbean.CompositeData;
049: import javax.management.openmbean.CompositeDataSupport;
050: import javax.management.openmbean.CompositeType;
051: import javax.management.openmbean.SimpleType;
052: import javax.management.openmbean.OpenType;
053:
054: /**
055: * Tests for the DeliveryChannelImpl class
056: *
057: * @author Sun Microsystems, Inc.
058: */
059: public class TestDeliveryChannelImpl extends junit.framework.TestCase {
060: /** Component ID for test channel. */
061: private static final String ID_A = "ChannelA";
062: /** Component ID for test channel. */
063: private static final String ID_B = "ChannelB";
064: /** Component ID for test channel. */
065: private static final String ID_C = "Observer";
066:
067: /** Service and endpoint constants. */
068: private static final QName OPERATION = new QName("operation");
069: private static final QName SERVICE_A = new QName("service-a");
070: private static final String ENDPOINT_A = "endpoint-a";
071: private static final QName SERVICE_B = new QName("service-b");
072: private static final String ENDPOINT_B = "endpoint-b";
073: private static final QName SERVICE_FOO = new QName("foo");
074: private static final String ENDPOINT_BAR = "bar";
075:
076: /** NMS impl */
077: private MessageService mMsgSvc;
078: /** NMR Environment Context */
079: private NMRContext mContext;
080: /** Test channel which is created/destroyed for each test. */
081: private DeliveryChannelImpl mChannelA;
082: /** Test channel which is created/destroyed for each test. */
083: private DeliveryChannelImpl mChannelB;
084: /** Test channel which is created/destroyed for each test. */
085: private DeliveryChannelImpl mChannelC;
086: /** Exchange factory for channel A. */
087: private MessageExchangeFactory mFactoryA;
088: /** Endpoint Reference on Channel A */
089: private RegisteredEndpoint mEndpointA;
090: /** Endpoint Reference on Channel B */
091: private RegisteredEndpoint mEndpointB;
092:
093: /**
094: * The constructor for this testcase, forwards the test name to
095: * the jUnit TestCase base class.
096: * @param aTestName String with the name of this test.
097: */
098: public TestDeliveryChannelImpl(String aTestName) {
099: super (aTestName);
100:
101: mMsgSvc = new MessageService();
102: mContext = new NMRContext(mMsgSvc);
103: }
104:
105: /**
106: * Setup for the test. This creates the ComponentRegistry instance
107: * and other objects needed for the tests.
108: * @throws Exception when set up fails for any reason.
109: */
110: public void setUp() throws Exception {
111: super .setUp();
112:
113: mMsgSvc.initService(mContext);
114: mMsgSvc.startService();
115:
116: // create test channels and add them to the NMS routing table
117: mChannelA = new DeliveryChannelImpl(ID_A, null, mMsgSvc, null);
118: mChannelB = new DeliveryChannelImpl(ID_B, null, mMsgSvc, null);
119: mChannelC = new DeliveryChannelImpl(ID_C, null, mMsgSvc, null);
120: mMsgSvc.addChannel(mChannelA);
121: mMsgSvc.addChannel(mChannelB);
122: mMsgSvc.addChannel(mChannelC);
123:
124: mFactoryA = mChannelA.createExchangeFactory();
125:
126: mEndpointA = mChannelA.activateEndpoint(SERVICE_A, ENDPOINT_A);
127: mEndpointB = mChannelB.activateEndpoint(SERVICE_B, ENDPOINT_B);
128: }
129:
130: /**
131: * Cleanup for the test.
132: * @throws Exception when tearDown fails for any reason.
133: */
134: public void tearDown() throws Exception {
135: super .tearDown();
136:
137: mChannelA.close();
138: mChannelB.close();
139: mChannelC.close();
140:
141: mMsgSvc.stopService();
142: mContext.reset();
143: }
144:
145: // ============================= test methods ================================
146:
147: /**
148: * Test accept on a channel with a pending exchange.
149: * @throws Exception if an unexpected error occurs
150: */
151: public void testAccept() throws Exception {
152: InOnly me1;
153: InOnly me2;
154:
155: me1 = mFactoryA.createInOnlyExchange();
156: me1.setProperty("test", "keith");
157: me1.setEndpoint(mEndpointB);
158: me1.setOperation(OPERATION);
159: mChannelA.send(me1);
160:
161: me2 = (InOnly) mChannelB.accept();
162:
163: // check to make sure we got something back
164: assertTrue(me2 != null);
165:
166: // simple test to see if we got the same thing back
167: assertTrue(me2.getProperty("test") != null);
168: }
169:
170: /**
171: * Make sure accept returns when the channel is closed.
172: * @throws Exception if an unexpected error occurs
173: */
174: public void testAcceptOnClose() throws Exception {
175: DeliveryChannelImpl channel;
176: Acceptor acceptor;
177: Exception error = null;
178:
179: channel = new DeliveryChannelImpl("foo", null, mMsgSvc, null);
180: acceptor = new Acceptor(channel);
181:
182: // start the accept and then close the channel
183: acceptor.start();
184:
185: // give the thread a chance to run
186: Thread.sleep(200);
187:
188: // close the channel to release the blocking accept
189: channel.close();
190:
191: // give the close() method a chance to acquire the lock
192: Thread.sleep(200);
193:
194: // make sure the thread didn't abend
195: if (acceptor.hasError()) {
196: fail(acceptor.getError().toString());
197: }
198:
199: // see if our accept thread is still running
200: assertTrue(!acceptor.isAlive());
201: }
202:
203: /**
204: * testDeactivateEndpoint
205: * @throws Exception if an unexpected error occurs
206: */
207: public void testDeactivateEndpointGood() throws Exception {
208: RegisteredEndpoint a;
209: RegisteredEndpoint b;
210:
211: // deactivate inbound and outbound endpoints
212: mChannelA.deactivateEndpoint(mEndpointA);
213: mChannelB.deactivateEndpoint(mEndpointB);
214:
215: a = (RegisteredEndpoint) mEndpointA;
216: b = (RegisteredEndpoint) mEndpointB;
217:
218: // make sure the deactivation took place
219: assertTrue(!a.isActive());
220: assertTrue(!b.isActive());
221: }
222:
223: public void testDeactivateEndpointBad() throws Exception {
224: // try to deactivate channel B's reference using channel A
225: try {
226: mChannelA.deactivateEndpoint(mEndpointB);
227: fail("able to deactivate endpoint that I didn't activate");
228: } catch (javax.jbi.messaging.MessagingException msgEx) {
229: }
230:
231: // make sure the ref wasn't deactivated, even though an exception was thrown
232: if (!((RegisteredEndpoint) mEndpointB).isActive()) {
233: fail("able to deactivate endpoint that I didn't activate");
234: }
235: }
236:
237: /**
238: * testActivateEndpointGood
239: * @throws Exception if an unexpected error occurs
240: */
241: public void testActivateEndpointGood() throws Exception {
242: RegisteredEndpoint re;
243:
244: re = (RegisteredEndpoint) mChannelA.activateEndpoint(
245: SERVICE_FOO, ENDPOINT_BAR);
246:
247: assertTrue(re.isActive());
248: }
249:
250: /**
251: * testActivateEndpointBad
252: * @throws Exception if an unexpected error occurs
253: */
254: public void testActivateEndpointBad() throws Exception {
255: ServiceEndpoint er;
256:
257: try {
258: er = mChannelA.activateEndpoint(SERVICE_B, ENDPOINT_B);
259: fail("able to activate a duplicate endpoint");
260: } catch (javax.jbi.messaging.MessagingException msgEx) {
261: }
262: }
263:
264: /**
265: * Happy path for send.
266: * @throws Exception if an unexpected error occurs
267: */
268: public void testSendGood() throws Exception {
269: InOnly me1;
270:
271: me1 = mFactoryA.createInOnlyExchange();
272: me1.setEndpoint(mEndpointB);
273: me1.setOperation(OPERATION);
274: mChannelA.send(me1);
275:
276: // Attempt to receive the exchange on channel B
277: assertTrue(mChannelB.accept() != null);
278: }
279:
280: /**
281: * Failure path for send.
282: * @throws Exception if an unexpected error occurs
283: */
284: public void testSendBad() throws Exception {
285: InOnly me1;
286:
287: // close channel A
288: mChannelA.close();
289:
290: me1 = mFactoryA.createInOnlyExchange();
291: me1.setEndpoint(mEndpointB);
292: me1.setOperation(OPERATION);
293:
294: // try to send on a closed channel
295: try {
296: mChannelA.send(me1);
297: fail("Able to send exchange on a closed channel!");
298: } catch (javax.jbi.messaging.MessagingException msgEx) {
299: }
300: }
301:
302: public void testSendServiceGood() throws Exception {
303: InOnly me1;
304:
305: me1 = mFactoryA.createInOnlyExchange();
306: me1.setService(SERVICE_B);
307: me1.setOperation(OPERATION);
308: mChannelA.send(me1);
309:
310: // Attempt to receive the exchange on channel B
311: assertTrue(mChannelB.accept() != null);
312: }
313:
314: public void testSendServiceBad() throws Exception {
315: InOnly me1;
316:
317: me1 = mFactoryA.createInOnlyExchange();
318: me1.setService(new QName("BOGUS"));
319: me1.setOperation(OPERATION);
320:
321: // try to send on a closed channel
322: try {
323: mChannelA.send(me1);
324: fail("Able to send exchange with bogus service!");
325: } catch (javax.jbi.messaging.MessagingException msgEx) {
326: }
327: }
328:
329: public void testSendInterfaceGood() throws Exception {
330: final QName INTERFACE_NAME = new QName("myinterface");
331:
332: InOnly me1;
333:
334: // add interface QName to service endpoint
335: mEndpointB.setInterfaces(new QName[] { INTERFACE_NAME });
336:
337: me1 = mFactoryA.createInOnlyExchange();
338: me1.setInterfaceName(INTERFACE_NAME);
339: me1.setOperation(OPERATION);
340: mChannelA.send(me1);
341:
342: // Attempt to receive the exchange on channel B
343: assertTrue(mChannelB.accept() != null);
344: }
345:
346: public void testSendInterfaceBad() throws Exception {
347: InOnly me1;
348:
349: me1 = mFactoryA.createInOnlyExchange();
350: me1.setService(new QName("RUBBISH"));
351: me1.setOperation(OPERATION);
352:
353: // try to send on a closed channel
354: try {
355: mChannelA.send(me1);
356: fail("Able to send exchange with rubbish interface!");
357: } catch (javax.jbi.messaging.MessagingException msgEx) {
358: }
359: }
360:
361: /**
362: * Check that the transactional properties are supported.
363: * @throws Exception if unexcepted error occurs.
364: */
365: public void testTransactional() throws Exception {
366: assertTrue(mChannelA.isTransactional());
367: mChannelA.setTransactional(false);
368: assertTrue(!mChannelA.isTransactional());
369: mChannelA.setTransactional(true);
370: assertTrue(mChannelA.isTransactional());
371: }
372:
373: public void testSendInterfaceWsdl20() throws Exception {
374: String id = "testcomp";
375: Component com;
376: ComponentContext ctx;
377: MessageExchange me;
378:
379: // create a component, add it to env, and get it's context object
380: com = new NMRComponent();
381: mContext.addComponentInstance(id, com);
382: ctx = mContext.getComponentContext(id);
383:
384: // activate an endpoint and create an exchange for one of its operations
385: ctx.activateEndpoint(WsdlDocument.STOCK_SERVICE_Q,
386: WsdlDocument.STOCK_ENDPOINT);
387: me = mChannelA.createExchangeFactory().createInOutExchange();
388:
389: me.setInterfaceName(WsdlDocument.STOCK_INTERFACE_Q);
390: me.setOperation(WsdlDocument.STOCK_OPERATION_Q);
391:
392: mChannelA.send(me);
393:
394: assertTrue(ctx.getDeliveryChannel().accept() != null);
395: }
396:
397: public void testSendInterfaceWsdl11() throws Exception {
398: String id = "testcomp";
399: Component com;
400: ComponentContext ctx;
401: MessageExchange me;
402:
403: // create a component, add it to env, and get it's context object
404: com = new NMRComponent(NMRComponent.WSDL_11);
405: mContext.addComponentInstance(id, com);
406: ctx = mContext.getComponentContext(id);
407:
408: // activate an endpoint and create an exchange for one of its operations
409: ctx.activateEndpoint(WsdlDocument.STOCK_SERVICE_Q,
410: WsdlDocument.STOCK_ENDPOINT);
411: me = mChannelA.createExchangeFactory().createInOutExchange();
412:
413: me.setInterfaceName(WsdlDocument.STOCK_INTERFACE_Q);
414: me.setOperation(WsdlDocument.STOCK_OPERATION_Q);
415:
416: mChannelA.send(me);
417:
418: assertTrue(ctx.getDeliveryChannel().accept() != null);
419: }
420:
421: /** Make sure that all aspects of a service endpoint (including interface
422: * information) are cleaned up when the endpoint is deactivated).
423: */
424: public void testRemoveEndpointWSDL() throws Exception {
425: String id = "testcomp";
426: Component com;
427: ComponentContext ctx;
428: ServiceEndpoint se;
429:
430: // create a component, add it to env, and get it's context object
431: com = new NMRComponent(NMRComponent.WSDL_11);
432: mContext.addComponentInstance(id, com);
433: ctx = mContext.getComponentContext(id);
434:
435: // activate an endpoint and verify it exists
436: se = ctx.activateEndpoint(WsdlDocument.STOCK_SERVICE_Q,
437: WsdlDocument.STOCK_ENDPOINT);
438: assertTrue(ctx.getEndpoints(WsdlDocument.STOCK_INTERFACE_Q).length == 1);
439:
440: // deactivate and verify that the endpoint no longer exists
441: ctx.deactivateEndpoint(se);
442: assertTrue(ctx
443: .getEndpointsForService(WsdlDocument.STOCK_SERVICE_Q).length == 0);
444: assertTrue(ctx.getEndpoints(WsdlDocument.STOCK_INTERFACE_Q).length == 0);
445: }
446:
447: /**
448: * Test Observer
449: * @throws Exception if an unexpected error occurs
450: */
451: public void testObserver() throws Exception {
452: InOnly me1;
453: MessageExchange me;
454:
455: // Setup an Observer.
456:
457: mMsgSvc.addObserver(mChannelC);
458:
459: // Perform a simple exchange.
460:
461: me1 = mFactoryA.createInOnlyExchange();
462: me1.setEndpoint(mEndpointB);
463: me1.setOperation(OPERATION);
464: mChannelA.send(me1);
465: me1 = (InOnly) mChannelB.accept();
466: me1.setStatus(ExchangeStatus.DONE);
467: mChannelB.send(me1);
468: mChannelA.accept();
469:
470: // Now check the Observer results.
471: me = mChannelC.accept();
472: assertTrue(me instanceof Observer);
473: assertTrue(me.getRole().equals(MessageExchange.Role.CONSUMER));
474: assertTrue(me.getStatus().equals(ExchangeStatus.ACTIVE));
475: assertTrue(me.getOperation().equals(OPERATION));
476: assertTrue(me.getProperty("com.sun.jbi.observer.sender")
477: .equals("ChannelA"));
478: assertTrue(me.getProperty("com.sun.jbi.observer.receiver")
479: .equals("ChannelB"));
480: assertTrue(me.getPattern().equals(
481: ExchangePattern.IN_ONLY.getURI()));
482:
483: me = mChannelC.accept();
484: assertTrue(me instanceof Observer);
485: assertTrue(me.getRole().equals(MessageExchange.Role.PROVIDER));
486: assertTrue(me.getStatus().equals(ExchangeStatus.DONE));
487: assertTrue(me.getOperation().equals(OPERATION));
488: assertTrue(me.getProperty("com.sun.jbi.observer.sender")
489: .equals("ChannelB"));
490: assertTrue(me.getProperty("com.sun.jbi.observer.receiver")
491: .equals("ChannelA"));
492: assertTrue(me.getPattern().equals(
493: ExchangePattern.IN_ONLY.getURI()));
494: assertTrue(mChannelC.accept(1) == null);
495:
496: mMsgSvc.removeObserver(mChannelC);
497:
498: // Perform a simple exchange.
499:
500: me1 = mFactoryA.createInOnlyExchange();
501: me1.setEndpoint(mEndpointB);
502: me1.setOperation(OPERATION);
503: mChannelA.send(me1);
504: me1 = (InOnly) mChannelB.accept();
505: me1.setStatus(ExchangeStatus.DONE);
506: mChannelB.send(me1);
507: mChannelA.accept();
508:
509: // Make sure that be don't observe this.
510: assertTrue(mChannelC.accept(1) == null);
511: }
512:
513: public void testStatistics() throws Exception {
514: InOnly me1;
515:
516: me1 = mFactoryA.createInOnlyExchange();
517: me1.setService(SERVICE_B);
518: me1.setOperation(OPERATION);
519: mChannelA.send(me1);
520:
521: // Attempt to receive the exchange on channel B
522: assertTrue(mChannelB.accept() != null);
523:
524: CompositeData cd = mChannelA.getStatistics();
525: assertTrue(cd != null);
526: assertEquals(37, cd.values().size());
527: }
528:
529: public void testEndpointStatistics() throws Exception {
530: InOnly me1;
531:
532: me1 = mFactoryA.createInOnlyExchange();
533: me1.setService(SERVICE_B);
534: me1.setOperation(OPERATION);
535: mChannelA.send(me1);
536:
537: // Attempt to receive the exchange on channel B
538: assertTrue(mChannelB.accept() != null);
539:
540: String[] c = mChannelB.getEndpointNames();
541: for (int i = 0; i < c.length; i++) {
542: EndpointStatistics es = mChannelB
543: .getEndpointStatistics(c[i]);
544: assertTrue(es != null);
545: CompositeData cd = es.getStatistics();
546: assertTrue(cd != null);
547: assertEquals(30, cd.values().size());
548: }
549: }
550:
551: /** Tests thread-safety of DeliveryChannelImpl with concurrent use.
552: */
553: public void testConcurrency() throws Exception {
554: MessageExchangeFactory mcf;
555:
556: InOnlyInitiator ioi1;
557: InOnlyInitiator ioi2;
558: InOnlyInitiator ioi3;
559: InOnlyInitiator ioi4;
560: InOnlyServicer ios1;
561: InOnlyServicer ios2;
562: InOnlyServicer ios3;
563: InOnlyServicer ios4;
564:
565: mcf = mChannelA.createExchangeFactory(mEndpointB);
566:
567: ioi1 = new InOnlyInitiator("A", mChannelA, mcf
568: .createInOnlyExchange());
569: ios1 = new InOnlyServicer(mChannelB);
570:
571: ioi2 = new InOnlyInitiator("B", mChannelA, mcf
572: .createInOnlyExchange());
573: ioi3 = new InOnlyInitiator("C", mChannelA, mcf
574: .createInOnlyExchange());
575: ioi4 = new InOnlyInitiator("D", mChannelA, mcf
576: .createInOnlyExchange());
577:
578: ios2 = new InOnlyServicer(mChannelB);
579: ios3 = new InOnlyServicer(mChannelB);
580: ios4 = new InOnlyServicer(mChannelB);
581:
582: ioi1.verifySuccess();
583: ioi2.verifySuccess();
584: ioi3.verifySuccess();
585: ioi4.verifySuccess();
586:
587: }
588:
589: //------------------------ Supporting Classes -------------------------
590:
591: class Acceptor extends Thread {
592: DeliveryChannelImpl mChannel;
593: Exception mError;
594:
595: Acceptor(DeliveryChannelImpl channel) {
596: mChannel = channel;
597: }
598:
599: public void run() {
600: try {
601: mChannel.accept();
602: } catch (Exception ex) {
603: mError = ex;
604: }
605: }
606:
607: public Exception getError() {
608: return mError;
609: }
610:
611: public boolean hasError() {
612: return mError != null;
613: }
614: }
615:
616: /** Initiating party for concurrency test. */
617: class InOnlyInitiator extends Thread {
618: String mId;
619: DeliveryChannelImpl mChannel;
620: MessageExchange mExchange;
621: Exception mError;
622:
623: InOnlyInitiator(String id, DeliveryChannelImpl channel,
624: MessageExchange exchange) {
625: mId = id;
626: mChannel = channel;
627: mExchange = exchange;
628:
629: mExchange.setOperation(OPERATION);
630: start();
631: }
632:
633: public void run() {
634: try {
635: // send exchange
636: mChannel.send(mExchange);
637: // accept it back with status
638: mExchange = mChannel.accept();
639: } catch (Exception ex) {
640: mError = ex;
641: }
642: }
643:
644: public void verifySuccess() throws Exception {
645: // make sure we're done
646: join(5000);
647:
648: if (mError != null) {
649: throw mError;
650: } else if (mExchange == null) {
651: throw new Exception("Initiator " + mId
652: + " received null exchange from accept");
653: } else {
654: System.out.println("Iniator " + mId
655: + " completed successfully.");
656: }
657: }
658: }
659:
660: /** Servicing party for concurrency test. */
661: class InOnlyServicer extends Thread {
662: DeliveryChannelImpl mChannel;
663: Exception mError;
664:
665: InOnlyServicer(DeliveryChannelImpl channel) {
666: mChannel = channel;
667: start();
668: }
669:
670: public void run() {
671: try {
672: MessageExchange me = mChannel.accept();
673: me.setStatus(ExchangeStatus.DONE);
674: mChannel.send(me);
675: } catch (Exception ex) {
676: ex.printStackTrace();
677: }
678: }
679:
680: public void verifySuccess() throws Exception {
681: if (mError != null) {
682: throw mError;
683: }
684: }
685: }
686: }
|