001: package test;
002:
003: import dalma.endpoints.jms.JMSEndPoint;
004: import dalma.endpoints.jms.MessageHandler;
005: import dalma.test.WorkflowTestProgram;
006: import junit.textui.TestRunner;
007: import org.activemq.ActiveMQConnectionFactory;
008:
009: import javax.jms.JMSException;
010: import javax.jms.Message;
011: import javax.jms.Queue;
012: import javax.jms.QueueConnection;
013: import javax.jms.QueueSession;
014: import javax.jms.Session;
015: import javax.jms.TextMessage;
016: import java.io.Serializable;
017:
018: /**
019: * @author Kohsuke Kawaguchi
020: */
021: public class JMSTest extends WorkflowTestProgram implements
022: MessageHandler {
023: public JMSTest(String name) throws Exception {
024: super (name);
025: }
026:
027: public static void main(String[] args) throws Exception {
028: TestRunner.run(JMSTest.class);
029: }
030:
031: JMSEndPoint ep1;
032: JMSEndPoint ep2;
033: QueueSession qs;
034: QueueConnection qcon;
035:
036: protected void setupEndPoints() throws Exception {
037: qcon = new ActiveMQConnectionFactory("tcp://localhost:61616")
038: .createQueueConnection();
039: qs = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
040: Queue out = qs.createQueue("dalma-out");
041: Queue in = qs.createQueue("dalma-in");
042:
043: ep1 = new JMSEndPoint("jms1", qs, out, in);
044: engine.addEndPoint(ep1);
045:
046: ep2 = new JMSEndPoint("jms2", qs, in, out);
047: ep2.setNewMessageHandler(this );
048: engine.addEndPoint(ep2);
049:
050: qcon.start();
051: }
052:
053: public void test() throws Throwable {
054: createConversation(Alice.class, ep1);
055:
056: // for now
057: Thread.sleep(3000);
058: engine.waitForCompletion();
059: }
060:
061: protected void tearDown() throws Exception {
062: try {
063: super .tearDown();
064: } finally {
065: qs.close();
066: qcon.close();
067: }
068: }
069:
070: public void onNewMessage(Message message) throws Exception {
071: System.out.println("new message");
072: createConversation(Bob.class, ep2, message);
073: }
074:
075: /**
076: * Activating side.
077: */
078: public static final class Alice implements Runnable, Serializable {
079: private final JMSEndPoint ep;
080:
081: public Alice(JMSEndPoint ep) {
082: this .ep = ep;
083: }
084:
085: public void run() {
086: try {
087: System.out.println("A: Hello");
088: TextMessage msg = ep.createMessage(TextMessage.class);
089: msg.setText("A:Hello");
090: msg = (TextMessage) ep.waitForReply(msg);
091:
092: System.out.println("A: Got " + msg.getText());
093: assertTrue(msg.getText().contains("B:"));
094: System.out.println("A: Bye");
095: msg = ep.createReplyMessage(TextMessage.class, msg);
096: msg.setText("A: Bye");
097: ep.send(msg);
098: } catch (JMSException e) {
099: throw new Error(e);
100: }
101: }
102: }
103:
104: /**
105: * Passive side.
106: */
107: public static final class Bob implements Runnable, Serializable {
108: private final JMSEndPoint ep;
109:
110: // initial msg
111: private TextMessage msg;
112:
113: public Bob(JMSEndPoint ep, TextMessage email) {
114: this .ep = ep;
115: this .msg = email;
116: }
117:
118: public void run() {
119: try {
120: TextMessage msg = this .msg;
121:
122: System.out.println("B: Got " + msg.getText());
123: assertTrue(msg.getText().contains("A:"));
124:
125: TextMessage reply = ep.createReplyMessage(
126: TextMessage.class, msg);
127: reply.setText("B: Hello back");
128: System.out.println("B: Hello back");
129: msg = (TextMessage) ep.waitForReply(reply);
130:
131: System.out.println("B: Got " + msg.getText());
132: assertTrue(msg.getText().contains("A:"));
133: System.out.println("B: dying");
134: } catch (Exception e) {
135: throw new Error(e);
136: }
137: }
138:
139: private static final long serialVersionUID = 1L;
140: }
141: }
|