001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.server;
023:
024: import java.lang.ref.SoftReference;
025:
026: import javax.jms.DeliveryMode;
027: import javax.jms.JMSException;
028:
029: import org.jboss.logging.Logger;
030: import org.jboss.mq.DurableSubscriptionID;
031: import org.jboss.mq.SpyMessage;
032:
033: /**
034: * This class holds a reference to an actual Message. Where it is actually
035: * at may vary. The reference it holds may be a:
036: * <ul>
037: * <li>Hard Reference - The message is consider recently used and should not be paged out
038: * <li>Soft Reference - The message is consider old and CAN be removed from memory by the GC
039: * <li>No Reference - The message was removed from memory by the GC, but we can load it from a file.
040: * </ul>
041: *
042: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
043: * @author <a href="mailto:pra@tim.se">Peter Antman</a>
044: * @version $Revision: 57198 $
045: */
046: public class MessageReference implements Comparable {
047: static Logger log = Logger.getLogger(MessageReference.class);
048:
049: /**
050: * The message is not persisted
051: */
052: public static final int NOT_STORED = 1;
053:
054: /**
055: * The message is persisted
056: */
057: public static final int STORED = 2;
058:
059: /**
060: * It was a persistent message for a joint
061: * cache store/persistent manager.
062: * This states guards against double
063: * removal from the cache while keeping
064: * error checking for incorrect double removal
065: * No message should be at this state for very long
066: */
067: public static final int REMOVED = 3;
068:
069: public long referenceId;
070: public SpyMessage hardReference;
071:
072: // These fields are copied over from the messae itself..
073: // they are used too often to not have them handy.
074: public byte jmsPriority;
075: public long messageId;
076: public int jmsDeliveryMode;
077: public long messageScheduledDelivery;
078: public long messageExpiration;
079:
080: public boolean redelivered;
081: public long redeliveryDelay;
082: public int redeliveryCount;
083:
084: // Data used on the server
085: public BasicQueue queue;
086: public MessageCache messageCache;
087: public SoftReference softReference;
088: public DurableSubscriptionID durableSubscriberID;
089: public int stored;
090: transient public Object persistData;
091:
092: MessageReference() {
093: }
094:
095: //init and reset methods for use by object pool
096: void init(MessageCache messageCache, long referenceId,
097: SpyMessage message, BasicQueue queue,
098: DurableSubscriptionID id) throws JMSException {
099: this .messageCache = messageCache;
100: this .hardReference = message;
101: this .referenceId = referenceId;
102: this .jmsPriority = (byte) message.getJMSPriority();
103: this .messageId = message.header.messageId;
104: this .stored = NOT_STORED;
105:
106: this .jmsDeliveryMode = message.header.jmsDeliveryMode;
107: if (message
108: .propertyExists(SpyMessage.PROPERTY_SCHEDULED_DELIVERY))
109: this .messageScheduledDelivery = message
110: .getLongProperty(SpyMessage.PROPERTY_SCHEDULED_DELIVERY);
111: if (message
112: .propertyExists(SpyMessage.PROPERTY_REDELIVERY_DELAY))
113: this .redeliveryDelay = message
114: .getLongProperty(SpyMessage.PROPERTY_REDELIVERY_DELAY);
115: // If redelivery delay is not specified, then use the value specified
116: // in queue configuration.
117: else if (queue.parameters.redeliveryDelay > 0)
118: this .redeliveryDelay = queue.parameters.redeliveryDelay;
119:
120: // If redelivery limit is not specified, then use the value specified
121: // in queue configuration.
122: if (queue.parameters.redeliveryLimit > -1
123: && !message
124: .propertyExists(SpyMessage.PROPERTY_REDELIVERY_LIMIT))
125: message.header.jmsProperties.put(
126: SpyMessage.PROPERTY_REDELIVERY_LIMIT, new Integer(
127: queue.parameters.redeliveryLimit));
128:
129: this .messageExpiration = message.getJMSExpiration();
130: this .redelivered = message.getJMSRedelivered();
131: if (message
132: .propertyExists(SpyMessage.PROPERTY_REDELIVERY_COUNT))
133: this .redeliveryCount = message
134: .getIntProperty(SpyMessage.PROPERTY_REDELIVERY_COUNT);
135:
136: this .durableSubscriberID = id;
137:
138: this .queue = queue;
139: this .persistData = null;
140: }
141:
142: void reset() {
143: //clear refs so gc can collect unused objects
144: if (softReference != null && softReference.get() != null)
145: messageCache.softRefCacheSize--;
146: this .messageCache = null;
147: this .hardReference = null;
148: this .softReference = null;
149: this .queue = null;
150: }
151:
152: public SpyMessage getMessageForDelivery() throws JMSException {
153: SpyMessage message = getMessage();
154: if (queue.parameters.lateClone) {
155: message = message.myClone();
156: message.header.durableSubscriberID = durableSubscriberID;
157: message.header.jmsRedelivered = redelivered;
158: message.header.jmsProperties.put(
159: SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(
160: redeliveryCount));
161: }
162: return message;
163: }
164:
165: public SpyMessage getMessage() throws JMSException {
166: SpyMessage result = null;
167:
168: synchronized (this ) {
169: if (hardReference == null) {
170: makeHard();
171: result = hardReference;
172: messageCache.messageReferenceUsedEvent(this , false);
173: } else {
174: result = hardReference;
175: messageCache.cacheHits++;
176: messageCache.messageReferenceUsedEvent(this , true);
177: }
178: return result;
179: }
180: }
181:
182: /**
183: * The message is being redelivered
184: */
185: public void redelivered() throws JMSException {
186: this .redelivered = true;
187:
188: if (redeliveryDelay != 0) {
189: log.trace("message has redelivery delay");
190: messageScheduledDelivery = System.currentTimeMillis()
191: + redeliveryDelay;
192: }
193:
194: ++redeliveryCount;
195:
196: if (isLateClone() == false) {
197: SpyMessage message = getMessage();
198: message.setJMSRedelivered(redelivered);
199:
200: message.header.jmsProperties.put(
201: SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(
202: redeliveryCount));
203: }
204: }
205:
206: /**
207: * Returns true if this message reference has expired.
208: */
209: public boolean isExpired() {
210: if (messageExpiration == 0)
211: return false;
212: long ts = System.currentTimeMillis();
213: return messageExpiration < ts;
214: }
215:
216: /**
217: * Determines whether the message is persistent in the sense
218: * that it survives a crash
219: */
220: public boolean isPersistent() {
221: return queue instanceof PersistentQueue
222: && jmsDeliveryMode == DeliveryMode.PERSISTENT;
223: }
224:
225: /**
226: * Are we entirely in memory?
227: *
228: * @return true when in memory, false otherwise
229: */
230: public boolean inMemory() {
231: return queue.parameters.inMemory;
232: }
233:
234: /**
235: * Determines the persistent for storing the message
236: */
237: public String getPersistentKey() {
238: return queue.getDescription();
239: }
240:
241: /**
242: * Are we late cloning messages?
243: */
244: public boolean isLateClone() {
245: return queue.parameters.lateClone;
246: }
247:
248: /**
249: * We could optimize caching by keeping the headers but not the body.
250: * The server will uses the headers more often than the body and the
251: * headers take up much message memory than the body
252: *
253: * For now just return the message.
254: */
255: public SpyMessage.Header getHeaders() throws JMSException {
256: return getMessage().header;
257: }
258:
259: void clear() throws JMSException {
260: synchronized (this ) {
261: if (stored == STORED)
262: messageCache.removeFromStorage(this );
263: stored = MessageReference.REMOVED;
264: }
265: }
266:
267: public void invalidate() throws JMSException {
268: synchronized (this ) {
269: if (stored == STORED) {
270: if (hardReference == null) {
271: makeHard();
272: messageCache.messageReferenceUsedEvent(this , false);
273: }
274: messageCache.removeFromStorage(this );
275: }
276: }
277: }
278:
279: public void removeDelayed() throws JMSException {
280: messageCache.removeDelayed(this );
281: }
282:
283: void makeSoft() throws JMSException {
284: boolean trace = log.isTraceEnabled();
285: synchronized (this ) {
286: // Attempt to soften a removed message
287: if (stored == REMOVED)
288: throw new JMSException(
289: "CACHE ERROR: makeSoft() on a removed message "
290: + this );
291:
292: // It is already soft
293: if (softReference != null) {
294: // Sanity check
295: if (stored == NOT_STORED)
296: throw new JMSException(
297: "CACHE ERROR: soft reference to unstored message "
298: + this );
299: return;
300: }
301:
302: if (stored == NOT_STORED)
303: messageCache.saveToStorage(this , hardReference);
304:
305: // HACK: allow the jdbc2 driver to reject saveToStorage for persistent messages
306: // it is just about to persist the message when it gets some cpu
307: if (stored != STORED) {
308: if (trace)
309: log.trace("saveToStorage rejected by cache "
310: + toString());
311: return;
312: }
313:
314: if (messageCache.getMakeSoftReferences())
315: softReference = new SoftReference(hardReference,
316: messageCache.referenceQueue);
317:
318: // We don't need the hard ref anymore..
319: messageCache.soften(this );
320: hardReference = null;
321: }
322: }
323:
324: /**
325: * Called from A PeristenceManager/CacheStore,
326: * to let us know that this message is already stored on disk.
327: */
328: public void setStored(int stored) {
329: this .stored = stored;
330: }
331:
332: void makeHard() throws JMSException {
333: synchronized (this ) {
334: // Attempt to harden a removed message
335: if (stored == REMOVED)
336: throw new JMSException(
337: "CACHE ERROR: makeHard() on a removed message "
338: + this );
339:
340: // allready hard
341: if (hardReference != null)
342: return;
343:
344: // Get the object via the softref
345: if (softReference != null)
346: hardReference = (SpyMessage) softReference.get();
347:
348: // It might have been removed from the cache due to memory constraints
349: if (hardReference == null) {
350: // load it from disk.
351: hardReference = messageCache.loadFromStorage(this );
352: messageCache.cacheMisses++;
353: } else {
354: messageCache.cacheHits++;
355: }
356:
357: // Since we have hard ref, we do not need the soft one.
358: if (softReference != null && softReference.get() != null)
359: messageCache.softRefCacheSize--;
360: softReference = null;
361: }
362: }
363:
364: public boolean equals(Object o) {
365: try {
366: return referenceId == ((MessageReference) o).referenceId;
367: } catch (Throwable e) {
368: return false;
369: }
370: }
371:
372: /**
373: * This method allows message to be order on the server queues
374: * by priority and the order that they came in on.
375: *
376: * @see Comparable#compareTo(Object)
377: */
378: public int compareTo(Object o) {
379: MessageReference sm = (MessageReference) o;
380: if (jmsPriority > sm.jmsPriority) {
381: return -1;
382: }
383: if (jmsPriority < sm.jmsPriority) {
384: return 1;
385: }
386: return (int) (messageId - sm.messageId);
387: }
388:
389: /**
390: * For debugging
391: */
392: public String toString() {
393: StringBuffer buffer = new StringBuffer(100);
394: if (messageCache == null)
395: buffer.append(" NOT IN CACHE hashCode=").append(hashCode());
396:
397: else {
398: buffer.append(referenceId);
399: buffer.append(" msg=").append(messageId);
400: if (hardReference != null)
401: buffer.append(" hard");
402: if (softReference != null)
403: buffer.append(" soft");
404: switch (stored) {
405: case NOT_STORED:
406: buffer.append(" NOT_STORED");
407: break;
408: case STORED:
409: buffer.append(" STORED");
410: break;
411: case REMOVED:
412: buffer.append(" REMOVED");
413: break;
414: }
415: switch (jmsDeliveryMode) {
416: case DeliveryMode.NON_PERSISTENT:
417: buffer.append(" NON_PERSISTENT");
418: break;
419: case DeliveryMode.PERSISTENT:
420: buffer.append(" PERSISTENT");
421: break;
422: }
423: if (persistData != null)
424: buffer.append(" persistData=").append(persistData);
425: if (queue != null)
426: buffer.append(" queue=").append(queue.getDescription());
427: else
428: buffer.append(" NO_QUEUE");
429: buffer.append(" priority=").append(jmsPriority);
430: buffer.append(" lateClone=").append(isLateClone());
431: buffer.append(" hashCode=").append(hashCode());
432: }
433: return buffer.toString();
434: }
435: }
|