001: package dalma.endpoints.jms;
002:
003: import dalma.EndPoint;
004: import dalma.ReplyIterator;
005: import dalma.Engine;
006: import dalma.endpoints.jms.impl.BytesMessageImpl;
007: import dalma.endpoints.jms.impl.MapMessageImpl;
008: import dalma.endpoints.jms.impl.MessageImpl;
009: import dalma.endpoints.jms.impl.ObjectMessageImpl;
010: import dalma.endpoints.jms.impl.StreamMessageImpl;
011: import dalma.endpoints.jms.impl.TextMessageImpl;
012: import dalma.spi.port.MultiplexedEndPoint;
013:
014: import javax.jms.BytesMessage;
015: import javax.jms.Destination;
016: import javax.jms.JMSException;
017: import javax.jms.MapMessage;
018: import javax.jms.Message;
019: import javax.jms.MessageConsumer;
020: import javax.jms.MessageListener;
021: import javax.jms.MessageProducer;
022: import javax.jms.ObjectMessage;
023: import javax.jms.QueueSession;
024: import javax.jms.StreamMessage;
025: import javax.jms.TextMessage;
026: import javax.jms.Session;
027: import java.util.Date;
028: import java.util.logging.Level;
029:
030: /**
031: * {@link EndPoint} that connects to two JMS queues.
032: *
033: * @author Kohsuke Kawaguchi
034: */
035: public class JMSEndPoint extends MultiplexedEndPoint<String, Message>
036: implements MessageListener {
037: private final Session session;
038:
039: private final MessageProducer sender;
040: private final MessageConsumer consumer;
041:
042: /**
043: * Handles uncorrelated messages.
044: */
045: private MessageHandler newMessageHandler;
046:
047: /**
048: * Creates a new {@link JMSEndPoint}.
049: *
050: * @param name
051: * name that uniquely identifies endpoints inside an {@link Engine}.
052: * must not be null.
053: * @param session
054: * JMS messages are sent/received through this session. must not be null.
055: * @param out
056: * The default {@link Destination} where out-going messages are sent to.
057: * can be null, in which case every out-going message must have a
058: * {@link Destination} set (such as when every outgoing message is a reply.)
059: * @param in
060: * The {@link Destination} where in-coming messages are picked up.
061: * must not be null.
062: */
063: public JMSEndPoint(String name, Session session, Destination out,
064: Destination in) throws JMSException {
065: super (name);
066: this .session = session;
067: sender = session.createProducer(out);
068: consumer = session.createConsumer(in);
069: consumer.setMessageListener(this );
070: }
071:
072: protected void start() {
073: // TODO: shall we control queue connection by ourselves, so that we can start it here?
074: }
075:
076: protected void stop() {
077: try {
078: consumer.close();
079: } catch (JMSException e) {
080: throw new Error(e); // what else can we do?
081: }
082: try {
083: sender.close();
084: } catch (JMSException e) {
085: throw new Error(e); // what else can we do?
086: }
087: }
088:
089: /**
090: * Invoked by JMS.
091: */
092: public void onMessage(Message message) {
093: try {
094: super .handleMessage(wrap(message));
095: } catch (JMSException e) {
096: logger.log(Level.WARNING,
097: "JMSEndPoint encountered an JMS error", e);
098: }
099: }
100:
101: protected String getKey(Message msg) {
102: try {
103: return msg.getJMSCorrelationID();
104: } catch (JMSException e) {
105: throw new QueueException(e);
106: }
107: }
108:
109: protected void onNewMessage(Message msg) {
110: MessageHandler h = newMessageHandler;
111: if (h != null) {
112: try {
113: h.onNewMessage(msg);
114: } catch (Exception e) {
115: logger.log(Level.WARNING, e.getMessage(), e);
116: }
117: }
118: }
119:
120: /**
121: * Sends a message and returns immediately.
122: */
123: public String send(Message msg) {
124: try {
125: Message providerMsg = unwrap(msg);
126: sender.send(providerMsg);
127: if (msg instanceof MessageImpl) {
128: // JMS sets various properties as a result of the send operation
129: // propagate them back to the message
130: ((MessageImpl) msg).wrap(providerMsg);
131: }
132: return providerMsg.getJMSMessageID();
133: } catch (JMSException e) {
134: throw new QueueException(e);
135: }
136: }
137:
138: //
139: //
140: // API methods
141: //
142: //
143:
144: /**
145: * Gets the last value set by {@link #setNewMessageHandler(MessageHandler)}.
146: */
147: public MessageHandler getNewMessageHandler() {
148: return newMessageHandler;
149: }
150:
151: /**
152: * Sets {@link MessageHandler} that handles uncorrelated messages
153: * received by this endpoint.
154: *
155: * @param newMessageHandler
156: * if null, uncorrelated messages are discarded.
157: */
158: public void setNewMessageHandler(MessageHandler newMessageHandler) {
159: this .newMessageHandler = newMessageHandler;
160: }
161:
162: /**
163: * Creates a new blank {@link Message} of the specified type.
164: *
165: * @param type
166: * one of 5 {@link Message}-derived types defined in JMS.
167: */
168: public <T extends Message> T createMessage(Class<T> type) {
169: if (type == BytesMessage.class)
170: return type.cast(new BytesMessageImpl());
171: if (type == MapMessage.class)
172: return type.cast(new MapMessageImpl());
173: if (type == ObjectMessage.class)
174: return type.cast(new ObjectMessageImpl());
175: if (type == StreamMessage.class)
176: return type.cast(new StreamMessageImpl());
177: if (type == TextMessage.class)
178: return type.cast(new TextMessageImpl());
179: throw new IllegalArgumentException();
180: }
181:
182: /**
183: * Wraps a provider-specific JMS Message object into our serializable wrapper.
184: */
185: private <T extends Message> T wrap(T msg) throws JMSException {
186: if (msg instanceof BytesMessage)
187: return (T) new BytesMessageImpl().wrap((BytesMessage) msg);
188: if (msg instanceof MapMessage)
189: return (T) new MapMessageImpl().wrap((MapMessage) msg);
190: if (msg instanceof ObjectMessage)
191: return (T) new ObjectMessageImpl()
192: .wrap((ObjectMessage) msg);
193: if (msg instanceof StreamMessage)
194: return (T) new StreamMessageImpl()
195: .wrap((StreamMessage) msg);
196: if (msg instanceof TextMessage)
197: return (T) new TextMessageImpl().wrap((TextMessage) msg);
198: throw new IllegalArgumentException();
199: }
200:
201: /**
202: * Unwraps our serializable wrapper into a provider-specific JMS Message.
203: */
204: private <T extends Message> T unwrap(T msg) throws JMSException {
205: if (msg instanceof BytesMessageImpl) {
206: BytesMessage r = session.createBytesMessage();
207: ((BytesMessageImpl) msg).writeTo(r);
208: return (T) r;
209: }
210: if (msg instanceof MapMessageImpl) {
211: MapMessage r = session.createMapMessage();
212: ((MapMessageImpl) msg).writeTo(r);
213: return (T) r;
214: }
215: if (msg instanceof ObjectMessage) {
216: ObjectMessage r = session.createObjectMessage();
217: ((ObjectMessageImpl) msg).writeTo(r);
218: return (T) r;
219: }
220: if (msg instanceof StreamMessage) {
221: StreamMessage r = session.createStreamMessage();
222: ((StreamMessageImpl) msg).writeTo(r);
223: return (T) r;
224: }
225: if (msg instanceof TextMessage) {
226: TextMessage r = session.createTextMessage();
227: ((TextMessageImpl) msg).writeTo(r);
228: return (T) r;
229: }
230: return msg;
231: }
232:
233: /**
234: * Creates a reply to the specified message.
235: */
236: public <T extends Message> T createReplyMessage(Class<T> type,
237: Message in) throws JMSException {
238: T reply = createMessage(type);
239: reply.setJMSCorrelationID(in.getJMSMessageID());
240: return reply;
241: }
242:
243: public Message waitForReply(Message msg) {
244: return super .waitForReply(msg);
245: }
246:
247: public Message waitForReply(Message msg, Date timeout) {
248: return super .waitForReply(msg, timeout);
249: }
250:
251: public ReplyIterator<Message> waitForMultipleReplies(
252: Message outgoing, Date expirationDate) {
253: return super.waitForMultipleReplies(outgoing, expirationDate);
254: }
255: }
|