001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq;
023:
024: import java.util.Enumeration;
025:
026: import javax.jms.BytesMessage;
027: import javax.jms.DeliveryMode;
028: import javax.jms.Destination;
029: import javax.jms.IllegalStateException;
030: import javax.jms.InvalidDestinationException;
031: import javax.jms.JMSException;
032: import javax.jms.MapMessage;
033: import javax.jms.Message;
034: import javax.jms.MessageEOFException;
035: import javax.jms.MessageProducer;
036: import javax.jms.ObjectMessage;
037: import javax.jms.StreamMessage;
038: import javax.jms.TemporaryQueue;
039: import javax.jms.TemporaryTopic;
040: import javax.jms.TextMessage;
041:
042: import org.jboss.logging.Logger;
043:
044: import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045:
046: /**
047: * This class implements javax.jms.MessageProducer
048: *
049: * @author Norbert Lataille (Norbert.Lataille@m4x.org)
050: * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
051: * @version $Revision: 57198 $
052: */
053: public class SpyMessageProducer implements MessageProducer {
054: /** The log */
055: static Logger log = Logger.getLogger(SpyMessageProducer.class);
056:
057: /** Is trace enabled */
058: static boolean trace = log.isTraceEnabled();
059:
060: /** The session */
061: protected SpySession session;
062: /** The destination */
063: protected Destination destination;
064: /** The defaultDeliveryMode */
065: protected int defaultDeliveryMode = SpyMessage.DEFAULT_DELIVERY_MODE;
066: /** The defaultPriority */
067: protected int defaultPriority = SpyMessage.DEFAULT_PRIORITY;
068: /** The defaultTTL */
069: protected long defaultTTL = SpyMessage.DEFAULT_TIME_TO_LIVE;
070: /** Whether we are closed */
071: private SynchronizedBoolean closed = new SynchronizedBoolean(false);
072: /** Whether to disable MessageID generation */
073: private boolean disableMessageID = false;
074: /** Whether to disable timestamp generation */
075: private boolean disableTS = false;
076:
077: /**
078: * Create a new SpyMessageProducer
079: *
080: * @param session the session
081: * @param destination the destination
082: */
083: SpyMessageProducer(SpySession session, Destination destination) {
084: trace = log.isTraceEnabled();
085:
086: this .session = session;
087: this .destination = destination;
088: try {
089: if (destination instanceof TemporaryQueue
090: || destination instanceof TemporaryTopic)
091: setDeliveryMode(DeliveryMode.NON_PERSISTENT);
092: else
093: setDeliveryMode(DeliveryMode.PERSISTENT);
094: } catch (JMSException ignored) {
095: log.debug("Ignored error during setDeliveryMode", ignored);
096: }
097:
098: if (trace)
099: log.trace("New message producer " + this );
100: }
101:
102: public void setDisableMessageID(boolean value) throws JMSException {
103: checkClosed();
104: disableMessageID = value;
105: }
106:
107: public void setDisableMessageTimestamp(boolean value)
108: throws JMSException {
109: checkClosed();
110: disableTS = value;
111: }
112:
113: public void setDeliveryMode(int deli) throws JMSException {
114: checkClosed();
115: if (deli != DeliveryMode.NON_PERSISTENT
116: && deli != DeliveryMode.PERSISTENT)
117: throw new JMSException("Bad DeliveryMode value");
118: else
119: defaultDeliveryMode = deli;
120: }
121:
122: public void setPriority(int pri) throws JMSException {
123: checkClosed();
124: if (pri < 0 || pri > 9)
125: throw new JMSException("Bad priority value");
126: else
127: defaultPriority = pri;
128: }
129:
130: public void setTimeToLive(int timeToLive) throws JMSException {
131: checkClosed();
132: if (timeToLive < 0)
133: throw new JMSException("Bad TimeToLive value");
134: else
135: defaultTTL = timeToLive;
136: }
137:
138: public void setTimeToLive(long timeToLive) throws JMSException {
139: checkClosed();
140: if (timeToLive < 0)
141: throw new JMSException("Bad TimeToLive value");
142: else
143: defaultTTL = timeToLive;
144: }
145:
146: public boolean getDisableMessageID() throws JMSException {
147: checkClosed();
148: return disableMessageID;
149: }
150:
151: public boolean getDisableMessageTimestamp() throws JMSException {
152: checkClosed();
153: return disableTS;
154: }
155:
156: public int getDeliveryMode() throws JMSException {
157: checkClosed();
158: return defaultDeliveryMode;
159: }
160:
161: public int getPriority() throws JMSException {
162: checkClosed();
163: return defaultPriority;
164: }
165:
166: public long getTimeToLive() throws JMSException {
167: checkClosed();
168: return defaultTTL;
169: }
170:
171: public void close() throws JMSException {
172: if (closed.set(true))
173: return;
174:
175: session.removeProducer(this );
176:
177: if (trace)
178: log.trace("Closed " + this );
179: }
180:
181: public Destination getDestination() throws JMSException {
182: checkClosed();
183: return destination;
184: }
185:
186: public void send(Message message) throws JMSException {
187: if (destination == null)
188: throw new UnsupportedOperationException(
189: "Not constructed with identifyed destination. Usage of method not allowed");
190: send(destination, message, defaultDeliveryMode,
191: defaultPriority, defaultTTL);
192: }
193:
194: public void send(Destination destination, Message message)
195: throws JMSException {
196: send(destination, message, defaultDeliveryMode,
197: defaultPriority, defaultTTL);
198: }
199:
200: public void send(Message message, int deliveryMode, int priority,
201: long ttl) throws JMSException {
202: if (destination == null)
203: throw new UnsupportedOperationException(
204: "Not constructed with identifyed destination. Usage of method not allowed");
205: send(destination, message, deliveryMode, priority, ttl);
206: }
207:
208: public void send(Destination destination, Message message,
209: int deliveryMode, int priority, long ttl)
210: throws JMSException {
211: checkClosed();
212:
213: if (this .destination != null
214: && this .destination.equals(destination) == false)
215: throw new UnsupportedOperationException("Sending to "
216: + destination
217: + " not allowed when producer created with "
218: + this .destination);
219:
220: if (destination == null
221: || (destination instanceof SpyDestination) == false)
222: throw new InvalidDestinationException(
223: "Destination is not an instance of SpyDestination "
224: + destination);
225:
226: // Encapsulate the message if not a SpyMessage
227: SpyMessage sendMessage;
228: if ((message instanceof SpyMessage) == false)
229: sendMessage = encapsulateMessage(message);
230: else
231: sendMessage = (SpyMessage) message;
232:
233: //Set the header fields
234: sendMessage.setJMSDestination(destination);
235: sendMessage.setJMSDeliveryMode(deliveryMode);
236: long ts = System.currentTimeMillis();
237: sendMessage.setJMSTimestamp(ts);
238: if (ttl == 0)
239: sendMessage.setJMSExpiration(0);
240: else
241: sendMessage.setJMSExpiration(ttl + ts);
242: sendMessage.setJMSPriority(priority);
243: String id = session.getNewMessageID();
244: sendMessage.setJMSMessageID(id);
245:
246: // If we encapsulated the message, update the original message
247: if (message != sendMessage) {
248: message.setJMSDestination(destination);
249: message.setJMSDeliveryMode(deliveryMode);
250: message.setJMSTimestamp(ts);
251: if (ttl == 0)
252: message.setJMSExpiration(0);
253: else
254: message.setJMSExpiration(ttl + ts);
255: message.setJMSPriority(priority);
256: message.setJMSMessageID(id);
257: }
258:
259: if (trace)
260: log.trace("Sending message " + this + " \n" + sendMessage);
261:
262: //Send the message.
263: session.sendMessage(sendMessage);
264: }
265:
266: public String toString() {
267: StringBuffer buffer = new StringBuffer(100);
268: buffer.append("SpyMessageProducer@").append(
269: System.identityHashCode(this ));
270: buffer.append("[ dest=").append(destination);
271: if (defaultDeliveryMode == DeliveryMode.PERSISTENT)
272: buffer.append(" delivery=").append("persist");
273: else
274: buffer.append(" delivery=").append("besteffort");
275: buffer.append(" priority=").append(defaultPriority);
276: buffer.append(" ttl=").append(defaultTTL);
277: buffer.append(" disableMessageID=").append(disableMessageID);
278: buffer.append(" disableTS=").append(disableTS);
279: buffer.append(" session=").append(session);
280: buffer.append(']');
281: return buffer.toString();
282: }
283:
284: protected SpyMessage encapsulateMessage(Message message)
285: throws JMSException {
286: SpyMessage result;
287: if (message instanceof BytesMessage) {
288: result = MessagePool.getBytesMessage();
289: BytesMessage original = (BytesMessage) message;
290: original.reset();
291: byte[] temp = new byte[1024];
292: int bytes = original.readBytes(temp);
293: while (bytes != -1) {
294: ((BytesMessage) result).writeBytes(temp, 0, bytes);
295: bytes = original.readBytes(temp);
296: }
297: } else if (message instanceof MapMessage) {
298: result = MessagePool.getMapMessage();
299: MapMessage original = (MapMessage) message;
300: for (Enumeration en = original.getMapNames(); en
301: .hasMoreElements();) {
302: String key = (String) en.nextElement();
303: try {
304: ((MapMessage) result).setObject(key, original
305: .getObject(key));
306: } catch (JMSException ignored) {
307: if (trace)
308: log.trace("Unable to copy map entry " + key,
309: ignored);
310: }
311: }
312: } else if (message instanceof StreamMessage) {
313: result = MessagePool.getStreamMessage();
314: StreamMessage original = (StreamMessage) message;
315: original.reset();
316: try {
317: while (true) {
318: ((StreamMessage) result).writeObject(original
319: .readObject());
320: }
321: } catch (MessageEOFException expected) {
322: }
323: } else if (message instanceof ObjectMessage) {
324: result = MessagePool.getObjectMessage();
325: ((ObjectMessage) result)
326: .setObject(((ObjectMessage) message).getObject());
327: } else if (message instanceof TextMessage) {
328: result = MessagePool.getTextMessage();
329: ((TextMessage) result).setText(((TextMessage) message)
330: .getText());
331: } else
332: result = MessagePool.getMessage();
333:
334: // Copy headers
335: try {
336: result.setJMSCorrelationID(message.getJMSCorrelationID());
337: } catch (JMSException e) {
338: //must be as bytes
339: result.setJMSCorrelationIDAsBytes(message
340: .getJMSCorrelationIDAsBytes());
341: }
342: result.setJMSReplyTo(message.getJMSReplyTo());
343: result.setJMSType(message.getJMSType());
344:
345: // Copy properties
346: for (Enumeration en = message.getPropertyNames(); en
347: .hasMoreElements();) {
348: String key = (String) en.nextElement();
349: try {
350: result.setObjectProperty(key, message
351: .getObjectProperty(key));
352: } catch (JMSException ignored) {
353: if (trace)
354: log
355: .trace("Unable to copy property " + key,
356: ignored);
357: }
358: }
359:
360: return result;
361: }
362:
363: /**
364: * Check whether we are closed
365: *
366: * @throws IllegalStateException when the session is closed
367: */
368: protected void checkClosed() throws JMSException {
369: if (closed.get())
370: throw new IllegalStateException(
371: "Message producer is closed");
372: }
373: }
|