001: /*
002: * BEGIN_HEADER - DO NOT EDIT
003: *
004: * The contents of this file are subject to the terms
005: * of the Common Development and Distribution License
006: * (the "License"). You may not use this file except
007: * in compliance with the License.
008: *
009: * You can obtain a copy of the license at
010: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
011: * See the License for the specific language governing
012: * permissions and limitations under the License.
013: *
014: * When distributing Covered Code, include this CDDL
015: * HEADER in each file and include the License file at
016: * https://open-esb.dev.java.net/public/CDDLv1.0.html.
017: * If applicable add the following below this CDDL HEADER,
018: * with the fields enclosed by brackets "[]" replaced with
019: * your own identifying information: Portions Copyright
020: * [year] [name of copyright owner]
021: */
022:
023: /*
024: * @(#)MQSession.java
025: * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
026: *
027: * END_HEADER - DO NOT EDIT
028: */
029: package com.sun.jbi.binding.jms.mq;
030:
031: import com.sun.jbi.StringTranslator;
032:
033: import com.sun.jbi.binding.jms.EndpointBean;
034: import com.sun.jbi.binding.jms.JMSBindingContext;
035: import com.sun.jbi.binding.jms.JMSBindingResources;
036:
037: import com.sun.jbi.binding.jms.handler.MessageHandler;
038:
039: import com.sun.jbi.binding.jms.util.UtilBase;
040:
041: import com.sun.jbi.binding.jms.config.ConfigConstants;
042: import java.util.logging.Logger;
043:
044: import javax.jms.InvalidDestinationException;
045: import javax.jms.InvalidSelectorException;
046: import javax.jms.JMSException;
047:
048: import javax.jms.Message;
049:
050: import javax.jms.Session;
051:
052: /**
053: * Wraps a JMS session.
054: *
055: * @author Sun Microsystems Inc.
056: */
057: public final class MQSession extends UtilBase implements Runnable,
058: JMSBindingResources {
059: /**
060: * Message processor.
061: */
062: private JMSMessageProcessor mProcessor;
063: /**
064: * Logger.
065: */
066: private Logger mLogger;
067: /**
068: * Connection.
069: */
070: private MQConnection mConnection;
071: /**
072: * Destination.
073: */
074: private MQDestination mDestination;
075: /**
076: * Consumer.
077: */
078: private javax.jms.MessageConsumer mConsumer;
079: /**
080: * Monitor.
081: */
082: private Object mMonitor;
083: /**
084: * JMS Sesion.
085: */
086: private javax.jms.Session mSession;
087: /**
088: * Durability.
089: */
090: private String mDurabilty;
091: /**
092: * Message Selector.
093: */
094: private String mMessageSelector = "";
095: /**
096: * Unique name.
097: */
098: private String mUniqueName;
099: /**
100: * Initialised.
101: */
102: private boolean mInitialised = false;
103: /**
104: * Receiver.
105: */
106: private boolean mReceiver;
107: /**
108: * Transacted.
109: */
110: private boolean mTransacted = false;
111: /**
112: * Ackonwledgement mode.
113: */
114: private int mAckMode = Session.CLIENT_ACKNOWLEDGE;
115: /**
116: * Toic or Queue.
117: */
118: private int mStyle;
119:
120: /**
121: * i18n.
122: */
123: private StringTranslator mTranslator;
124:
125: /**
126: * Creates a new JMS Session.
127: */
128: public MQSession() {
129: mTransacted = false;
130: mProcessor = new JMSMessageProcessor();
131: mDurabilty = ConfigConstants.NON_DURABLE;
132: mLogger = JMSBindingContext.getInstance().getLogger();
133: mTranslator = JMSBindingContext.getInstance()
134: .getStringTranslator();
135: mMonitor = new Object();
136: }
137:
138: /**
139: * Gets the type of ackowledgement mode for this session.
140: *
141: * @return type of ACK mode.
142: */
143: public int getAckMode() {
144: return mAckMode;
145: }
146:
147: /**
148: * Creates a new MQSession object. This constructor will be used by manager
149: * for creating message receivers.
150: *
151: * @param bean endpooint bean.
152: */
153: public void setBean(EndpointBean bean) {
154: setConnection(bean.getConnection());
155: setStyle(bean.getStyle());
156: setName(bean.getUniqueName());
157: setDestination(bean.getDestination());
158: setMessageSelector((String) bean
159: .getValue(ConfigConstants.MESSAGE_SELECTOR));
160: setDurabilty((String) bean.getValue(ConfigConstants.DURABILITY));
161: }
162:
163: /**
164: * Sets the connection object.
165: *
166: * @param con Wrapper to JMS connection.
167: */
168: public void setConnection(MQConnection con) {
169: mConnection = con;
170: }
171:
172: /**
173: * Gets the MQ connection object.
174: *
175: * @return wrapper to MQ connection.
176: */
177: public MQConnection getConnection() {
178: return mConnection;
179: }
180:
181: /**
182: * Sets the destination.
183: *
184: * @param dest wrapper to JMS destination.
185: */
186: public void setDestination(MQDestination dest) {
187: mDestination = dest;
188: }
189:
190: /**
191: * Sets the durability.
192: *
193: * @param durable durability.
194: */
195: public void setDurabilty(String durable) {
196: mDurabilty = durable;
197: }
198:
199: /**
200: * Sets the message selector.
201: *
202: * @param sel selector.
203: */
204: public void setMessageSelector(String sel) {
205: mMessageSelector = sel;
206: }
207:
208: /**
209: * Sets the name.
210: *
211: * @param name name.
212: */
213: public void setName(String name) {
214: mUniqueName = name;
215: }
216:
217: /**
218: * Sets a receiver.
219: *
220: * @return true if the receiver is set.
221: */
222: public boolean setReceiver() {
223: clear();
224:
225: if (!mInitialised) {
226: mLogger.severe(mTranslator
227: .getString(JMS_SESSION_NOT_INITIALIZED));
228: setError(mTranslator.getString(JMS_SESSION_NOT_INITIALIZED));
229:
230: return false;
231: }
232:
233: try {
234: mLogger.fine(mTranslator.getString(
235: JMS_MESSAGE_SELECTOR_FINE, mMessageSelector));
236:
237: if (mDurabilty.equals(ConfigConstants.DURABLE)) {
238: mConsumer = mSession
239: .createDurableSubscriber(
240: (javax.jms.Topic) mDestination
241: .getDestination(), mUniqueName,
242: mMessageSelector, true);
243: } else {
244: mConsumer = mSession.createConsumer(mDestination
245: .getDestination(), mMessageSelector);
246: }
247: } catch (JMSException je) {
248: setError(mTranslator.getString(
249: JMS_MQ_CANNOT_CREATE_CONSUMER, mDestination));
250: setError(je.getMessage());
251:
252: return false;
253: }
254:
255: mReceiver = true;
256:
257: return true;
258: }
259:
260: /**
261: * Sets the style.
262: *
263: * @param style topic or queue.
264: */
265: public void setStyle(int style) {
266: mStyle = style;
267: }
268:
269: /**
270: * Gets the style.
271: *
272: * @return style.
273: */
274: public int getStyle() {
275: return mStyle;
276: }
277:
278: /**
279: * Set transacted.
280: */
281: public void setTransacted() {
282: mTransacted = true;
283: }
284:
285: /**
286: * Gets the tarnasction status.
287: *
288: * @return transaction status.
289: */
290: public boolean getTransacted() {
291: return mTransacted;
292: }
293:
294: /**
295: * Close.
296: */
297: public void close() {
298: try {
299: mSession.close();
300: } catch (JMSException je) {
301: setError(mTranslator.getString(JMS_CANNOT_CLOSE_SESSION,
302: mSession));
303: setError(je.getMessage());
304: }
305: }
306:
307: /**
308: * Creates a JMS message.
309: *
310: * @param type type of JMS message.
311: *
312: * @return JMS message.
313: */
314: public Message createJMSMessage(String type) {
315: Message msg = null;
316:
317: try {
318: if (type.equals("TextMessage")) {
319: msg = mSession.createTextMessage();
320: } else if (type.equals("StreamMessage")) {
321: msg = mSession.createStreamMessage();
322: } else if (type.equals("MapMessage")) {
323: msg = mSession.createMapMessage();
324: } else if (type.equals("ByteMessage")) {
325: msg = mSession.createBytesMessage();
326: } else if (type.equals("ObjectMessage")) {
327: msg = mSession.createObjectMessage();
328: }
329: } catch (Exception e) {
330: setError(mTranslator.getString(JMS_CANNOT_CREATE_MESSAGE,
331: type));
332: setError(e.getMessage());
333: }
334:
335: return msg;
336: }
337:
338: /**
339: * Initializes the session.
340: *
341: * @throws JMSException
342: */
343: public void init() throws JMSException {
344: clear();
345:
346: if (mInitialised) {
347: return;
348: }
349:
350: try {
351: if (mStyle == ConfigConstants.QUEUE) {
352: mSession = ((javax.jms.QueueConnection) (mConnection
353: .getConnection())).createQueueSession(
354: mTransacted, mAckMode);
355: } else {
356: mSession = ((javax.jms.TopicConnection) (mConnection
357: .getConnection())).createTopicSession(
358: mTransacted, mAckMode);
359: }
360:
361: mInitialised = true;
362: mReceiver = false;
363: } catch (JMSException je) {
364: mLogger.severe(mTranslator.getString(
365: JMS_CANNOT_CREATE_SESSION, je.getMessage()));
366: setError(je.getMessage());
367: throw je;
368: }
369: }
370:
371: /**
372: * Runs the thread.
373: */
374: public void run() {
375: mLogger.fine("Starting Receiver for Destination "
376: + mDestination.getDestination().toString());
377:
378: if (!mInitialised) {
379: mLogger.severe(mTranslator
380: .getString(JMS_SESSION_NOT_INITIALIZED));
381: setError(mTranslator.getString(JMS_SESSION_NOT_INITIALIZED));
382:
383: return;
384: }
385:
386: JMSBindingContext ctx = JMSBindingContext.getInstance();
387:
388: try {
389: Message msg = null;
390:
391: while (mMonitor != null) {
392: synchronized (this ) {
393: msg = mConsumer.receive(50);
394: if (msg != null) {
395: msg.acknowledge();
396: }
397: }
398:
399: MessageHandler hndl = null;
400:
401: if (msg != null) {
402: hndl = mProcessor.process(mUniqueName, msg);
403: }
404:
405: if (hndl == null) {
406: // decide where to acknowledge or continue
407: continue;
408: }
409:
410: ctx.getJMSWorkManager().processCommand(hndl);
411: mLogger.fine(msg.toString());
412: }
413: } catch (InvalidDestinationException ie) {
414: ie.printStackTrace();
415: } catch (InvalidSelectorException is) {
416: is.printStackTrace();
417: } catch (JMSException je) {
418: if (je.getErrorCode().equals(MQCodes.GOODBYE)) {
419: return;
420: }
421: } catch (Exception e) {
422: e.printStackTrace();
423: } finally {
424: if (mConsumer != null) {
425: try {
426: mConsumer.close();
427: } catch (JMSException je) {
428: ;
429: }
430: }
431: }
432:
433: mReceiver = false;
434: }
435:
436: /**
437: * Sends the message.
438: *
439: * @param msg JMS message to be sent.
440: * @param dest destination on which it has to be sent.
441: *
442: * @return true if successfuly sent.
443: */
444: public synchronized boolean sendMessage(Message msg,
445: MQDestination dest) {
446: javax.jms.MessageProducer producer = null;
447: clear();
448:
449: try {
450: producer = mSession.createProducer(dest.getDestination());
451: msg.setJMSDestination(dest.getDestination());
452: producer.send(msg);
453: mLogger.fine("Sent message " + msg.toString());
454: producer.close();
455: } catch (Throwable t) {
456: t.printStackTrace();
457: setError(t.getMessage());
458:
459: return false;
460:
461: // dunno what to do wil decide
462: } finally {
463: if (producer != null) {
464: try {
465: producer.close();
466: } catch (JMSException je) {
467: ;
468: }
469: }
470: }
471:
472: return true;
473: }
474:
475: /**
476: * Stops receiving on this session.
477: */
478: public void stopReceiving() {
479: mMonitor = null;
480: try {
481: mConsumer.close();
482: } catch (JMSException je) {
483: ;
484: }
485: }
486: }
|