001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.bpmscript.jms;
019:
020: import java.util.ArrayList;
021: import java.util.Enumeration;
022: import java.util.HashSet;
023: import java.util.List;
024: import java.util.Set;
025: import java.util.concurrent.Callable;
026: import java.util.concurrent.ExecutionException;
027: import java.util.concurrent.ExecutorService;
028: import java.util.concurrent.Executors;
029: import java.util.concurrent.Future;
030:
031: import javax.jms.BytesMessage;
032: import javax.jms.Connection;
033: import javax.jms.ConnectionFactory;
034: import javax.jms.Destination;
035: import javax.jms.JMSException;
036: import javax.jms.MapMessage;
037: import javax.jms.Message;
038: import javax.jms.MessageConsumer;
039: import javax.jms.MessageListener;
040: import javax.jms.ObjectMessage;
041: import javax.jms.Queue;
042: import javax.jms.QueueBrowser;
043: import javax.jms.Session;
044: import javax.jms.StreamMessage;
045: import javax.jms.Topic;
046:
047: import org.springframework.beans.factory.DisposableBean;
048: import org.springframework.beans.factory.InitializingBean;
049:
050: public final class JmsTemplate implements InitializingBean,
051: DisposableBean {
052:
053: protected final transient org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
054: .getLog(getClass());
055:
056: public JmsTemplate(ConnectionFactory connectionFactory) {
057: this .connectionFactory = connectionFactory;
058: }
059:
060: public JmsTemplate() {
061:
062: }
063:
064: private ExecutorService executor = Executors
065: .newSingleThreadExecutor();
066:
067: private Session singletonSession;
068: private ConnectionFactory connectionFactory;
069: private Connection connection = null;
070: private boolean transacted = false;
071: private int ackMode = Session.AUTO_ACKNOWLEDGE;
072: private String username;
073: private String password;
074:
075: public Object execute(SessionCallback callback)
076: throws JmsTemplateException {
077: Session session = null;
078: try {
079: session = connection.createSession(transacted, ackMode);
080: Object result = callback.doInJms(session);
081: return result;
082: } catch (Exception e) {
083: throw new JmsTemplateException(e);
084: } finally {
085: if (session != null) {
086: try {
087: session.close();
088: } catch (JMSException e) {
089: log.error(e, e);
090: }
091: }
092: }
093: }
094:
095: public Object executeTransacted(SessionCallback callback)
096: throws JmsTemplateException {
097: Session session = null;
098: try {
099: session = connection.createSession(true, ackMode);
100: try {
101: Object result = callback.doInJms(session);
102: session.commit();
103: return result;
104: } catch (Exception e) {
105: session.rollback();
106: throw e;
107: } catch (Throwable e) {
108: session.rollback();
109: throw new Exception(e);
110: }
111: } catch (Exception e) {
112: throw new JmsTemplateException(e);
113: } finally {
114: if (session != null) {
115: try {
116: session.close();
117: } catch (JMSException e) {
118: log.error(e, e);
119: }
120: }
121: }
122: }
123:
124: public Message getBySelector(final Queue queue,
125: final String selector) throws JmsTemplateException {
126: return (Message) execute(new SessionCallback() {
127:
128: public Object doInJms(Session session) throws Exception {
129: return getBySelector(session, queue, selector);
130: }
131:
132: });
133: }
134:
135: @SuppressWarnings("unchecked")
136: public List<Message> findBySelector(final Queue queue,
137: final String selector) throws JmsTemplateException {
138: return (List<Message>) execute(new SessionCallback() {
139:
140: public Object doInJms(Session session) throws Exception {
141: return findBySelector(session, queue, selector);
142: }
143:
144: });
145: }
146:
147: public Message getBySelector(final Session session,
148: final Queue queue, final String selector)
149: throws JMSException {
150: QueueBrowser browser = session.createBrowser(queue, selector);
151: Enumeration enumeration = browser.getEnumeration();
152: if (enumeration.hasMoreElements()) {
153: return (Message) enumeration.nextElement();
154: } else {
155: return null;
156: }
157: }
158:
159: @SuppressWarnings("unchecked")
160: public List<Message> findBySelector(final Session session,
161: final Queue queue, final String selector)
162: throws JMSException {
163: QueueBrowser browser = session.createBrowser(queue, selector);
164: Enumeration enumeration = browser.getEnumeration();
165: List<Message> messages = new ArrayList<Message>();
166: while (enumeration.hasMoreElements()) {
167: messages.add((Message) enumeration.nextElement());
168: }
169: return messages;
170: }
171:
172: public Object execute(boolean transacted, int ackMode,
173: SessionCallback callback) throws JmsTemplateException {
174: Session session = null;
175: try {
176: session = connection.createSession(transacted, ackMode);
177: Object result = callback.doInJms(session);
178: return result;
179: } catch (Exception e) {
180: throw new JmsTemplateException(e);
181: } finally {
182: if (session != null) {
183: try {
184: session.close();
185: } catch (JMSException e) {
186: log.error(e, e);
187: }
188: }
189: }
190: }
191:
192: public Object executeNewConnection(boolean transacted, int ackMode,
193: SessionCallback callback) throws JmsTemplateException {
194: Session session = null;
195: Connection connection = null;
196: try {
197: if (username != null) {
198: connection = connectionFactory.createConnection(
199: username, password);
200: } else {
201: connection = connectionFactory.createConnection();
202: }
203: connection.start();
204: session = connection.createSession(transacted, ackMode);
205: Object result = callback.doInJms(session);
206: connection.stop();
207: return result;
208: } catch (Exception e) {
209: throw new JmsTemplateException(e);
210: } finally {
211: if (session != null) {
212: try {
213: session.close();
214: } catch (JMSException e) {
215: log.error(e, e);
216: }
217: }
218: if (connection != null) {
219: try {
220: connection.close();
221: } catch (JMSException e) {
222: log.error(e, e);
223: }
224: }
225: }
226: }
227:
228: public Queue getQueue(final String name)
229: throws JmsTemplateException {
230: Future<Queue> future = executor.submit(new Callable<Queue>() {
231: public Queue call() throws Exception {
232: try {
233: return singletonSession.createQueue(name);
234: } catch (JMSException e) {
235: throw new RuntimeException(e);
236: }
237: }
238: });
239: try {
240: return future.get();
241: } catch (InterruptedException e) {
242: throw new JmsTemplateException(e);
243: } catch (ExecutionException e) {
244: throw new JmsTemplateException(e);
245: }
246: }
247:
248: public Topic getTopic(final String name)
249: throws JmsTemplateException {
250: Future<Topic> future = executor.submit(new Callable<Topic>() {
251: public Topic call() throws Exception {
252: try {
253: return singletonSession.createTopic(name);
254: } catch (JMSException e) {
255: throw new RuntimeException(e);
256: }
257: }
258: });
259: try {
260: return future.get();
261: } catch (InterruptedException e) {
262: throw new JmsTemplateException(e);
263: } catch (ExecutionException e) {
264: throw new JmsTemplateException(e);
265: }
266: }
267:
268: public void setMessageListener(final Destination destination,
269: final String selector, final MessageListener messageListener)
270: throws JmsTemplateException {
271: Future<Object> future = executor.submit(new Callable<Object>() {
272: public Object call() throws Exception {
273: try {
274: MessageConsumer consumer = singletonSession
275: .createConsumer(destination, selector);
276: consumer.setMessageListener(messageListener);
277: return null;
278: } catch (JMSException e) {
279: throw new RuntimeException(e);
280: }
281: }
282: });
283: try {
284: future.get();
285: } catch (InterruptedException e) {
286: throw new JmsTemplateException(e);
287: } catch (ExecutionException e) {
288: throw new JmsTemplateException(e);
289: }
290: }
291:
292: public Message copyMessage(Session session, Message in,
293: String... ignore) throws JMSException {
294: Message out = null;
295: if (in instanceof BytesMessage) {
296: BytesMessage inBytesMessage = (BytesMessage) in;
297: BytesMessage outBytesMessage = session.createBytesMessage();
298: byte[] buf = new byte[4096];
299: int read = -1;
300: while ((read = inBytesMessage.readBytes(buf)) != -1) {
301: outBytesMessage.writeBytes(buf, 0, read);
302: }
303: out = outBytesMessage;
304: } else if (in instanceof ObjectMessage) {
305: // ObjectMessage inObjectMessage = (ObjectMessage) in;
306: // ObjectMessage outObjectMessage = session.createObjectMessage();
307: // out = outObjectMessage;
308: throw new UnsupportedOperationException();
309: } else if (in instanceof MapMessage) {
310: // MapMessage inMapMessage = (MapMessage) in;
311: // MapMessage outMapMessage = session.createMapMessage();
312: // out = outMapMessage;
313: throw new UnsupportedOperationException();
314: } else if (in instanceof StreamMessage) {
315: throw new UnsupportedOperationException();
316: } else {
317: out = session.createMessage();
318: }
319: Set<String> ignoreSet = null;
320: if (ignore != null) {
321: ignoreSet = new HashSet<String>();
322: for (int i = 0; i < ignore.length; i++) {
323: String ignoreValue = ignore[i];
324: ignoreSet.add(ignoreValue);
325: }
326: }
327: Enumeration propertyNames = in.getPropertyNames();
328: while (propertyNames.hasMoreElements()) {
329: String propertyName = (String) propertyNames.nextElement();
330: if (ignore != null || !ignoreSet.contains(propertyName)) {
331: out.setObjectProperty(propertyName, in
332: .getObjectProperty(propertyName));
333: }
334: }
335: return out;
336: }
337:
338: public void setAckMode(int ackMode) {
339: this .ackMode = ackMode;
340: }
341:
342: public void setTransacted(boolean transacted) {
343: this .transacted = transacted;
344: }
345:
346: public void afterPropertiesSet() throws Exception {
347: if (username != null) {
348: this .connection = connectionFactory.createConnection(
349: username, password);
350: } else {
351: this .connection = connectionFactory.createConnection();
352: }
353: this .connection.start();
354: Future<Session> future = executor
355: .submit(new Callable<Session>() {
356: public Session call() {
357: try {
358: return connection.createSession(transacted,
359: ackMode);
360: } catch (JMSException e) {
361: throw new RuntimeException(e);
362: }
363: }
364: });
365: singletonSession = future.get();
366: }
367:
368: public void destroy() throws Exception {
369: if (connection != null) {
370: try {
371: connection.stop();
372: } catch (JMSException e) {
373: log.error(e, e);
374: }
375: try {
376: connection.close();
377: } catch (JMSException e) {
378: log.error(e, e);
379: }
380: }
381: }
382:
383: public void setConnectionFactory(ConnectionFactory connectionFactory) {
384: this .connectionFactory = connectionFactory;
385: }
386:
387: public void setPassword(String password) {
388: this .password = password;
389: }
390:
391: public void setUsername(String username) {
392: this.username = username;
393: }
394:
395: }
|