001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.pm.none;
023:
024: import javax.jms.JMSException;
025: import javax.management.ObjectName;
026:
027: import org.jboss.mq.SpyDestination;
028: import org.jboss.mq.SpyMessage;
029: import org.jboss.mq.pm.CacheStore;
030: import org.jboss.mq.pm.Tx;
031: import org.jboss.mq.pm.TxManager;
032: import org.jboss.mq.server.JMSDestination;
033: import org.jboss.mq.server.MessageCache;
034: import org.jboss.mq.server.MessageReference;
035: import org.jboss.system.ServiceMBeanSupport;
036:
037: import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
038: import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
039:
040: /**
041: * A persistence manager and cache store that does not persistence.
042: *
043: * It can do persistence by delegating to a real persistence manager,
044: * depending upon destination configuration.
045: *
046: * @jmx:mbean extends="org.jboss.system.ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean, org.jboss.mq.pm.CacheStoreMBean"
047: *
048: * @author Adrian Brock (adrian@jboss.org)
049: *
050: * @version $Revision: 57198 $
051: */
052: public class PersistenceManager extends ServiceMBeanSupport implements
053: PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager,
054: CacheStore {
055: // Constants --------------------------------------------------------------------
056:
057: // Attributes -------------------------------------------------------------------
058:
059: /** The next transaction id */
060: SynchronizedLong nextTransactionid = new SynchronizedLong(0l);
061:
062: /** The oject name of the delegate persistence manger */
063: ObjectName delegateName;
064:
065: /** The delegate persistence manager */
066: org.jboss.mq.pm.PersistenceManager delegate;
067:
068: /** The jbossmq transaction manager */
069: TxManager txManager;
070:
071: /** The cache */
072: ConcurrentHashMap cache = new ConcurrentHashMap();
073:
074: // Constructors -----------------------------------------------------------------
075:
076: // Public -----------------------------------------------------------------------
077:
078: /**
079: * Retrieve the delegate persistence manager
080: *
081: * @jmx:managed-attribute
082: * @return the object name of the delegate persistence manager
083: */
084: public ObjectName getDelegatePM() {
085: return delegateName;
086: }
087:
088: /**
089: * Set the delegate persistence manager
090: *
091: * @jmx:managed-attribute
092: * @param delegatePM the delegate persistence manager
093: */
094: public void setDelegatePM(ObjectName delegateName) {
095: this .delegateName = delegateName;
096: }
097:
098: // PersistenceManager implementation --------------------------------------------
099:
100: public void add(MessageReference message, Tx txId)
101: throws JMSException {
102: if (delegate != null && message.inMemory() == false)
103: delegate.add(message, txId);
104: }
105:
106: public void closeQueue(JMSDestination jmsDest, SpyDestination dest)
107: throws JMSException {
108: if (delegate != null)
109: delegate.closeQueue(jmsDest, dest);
110: }
111:
112: public void commitPersistentTx(Tx txId) throws JMSException {
113: if (delegate != null)
114: delegate.commitPersistentTx(txId);
115: }
116:
117: public Tx createPersistentTx() throws JMSException {
118: if (delegate != null)
119: return delegate.createPersistentTx();
120:
121: Tx tx = new Tx(nextTransactionid.increment());
122: return tx;
123: }
124:
125: public MessageCache getMessageCacheInstance() {
126: if (delegate != null)
127: return delegate.getMessageCacheInstance();
128:
129: throw new UnsupportedOperationException(
130: "This is now set on the destination manager");
131: }
132:
133: public TxManager getTxManager() {
134: return txManager;
135: }
136:
137: public void remove(MessageReference message, Tx txId)
138: throws JMSException {
139: if (delegate != null && message.inMemory() == false)
140: delegate.remove(message, txId);
141: }
142:
143: public void restoreQueue(JMSDestination jmsDest, SpyDestination dest)
144: throws JMSException {
145: if (delegate != null)
146: delegate.restoreQueue(jmsDest, dest);
147: }
148:
149: public void rollbackPersistentTx(Tx txId) throws JMSException {
150: if (delegate != null)
151: delegate.rollbackPersistentTx(txId);
152: }
153:
154: public void update(MessageReference message, Tx txId)
155: throws JMSException {
156: if (delegate != null && message.inMemory() == false)
157: delegate.update(message, txId);
158: }
159:
160: // PersistenceManagerMBean implementation ---------------------------------------
161:
162: public Object getInstance() {
163: return this ;
164: }
165:
166: /**
167: * Unsupported operation
168: */
169: public ObjectName getMessageCache() {
170: if (delegateName != null) {
171: try {
172: return (ObjectName) server.getAttribute(delegateName,
173: "MessageCache");
174: } catch (Exception e) {
175: log
176: .trace(
177: "Unable to retrieve message cache from delegate",
178: e);
179: }
180: }
181: throw new UnsupportedOperationException(
182: "This is now set on the destination manager");
183: }
184:
185: /**
186: * Unsupported operation
187: */
188: public void setMessageCache(ObjectName messageCache) {
189: throw new UnsupportedOperationException(
190: "This is now set on the destination manager");
191: }
192:
193: // CacheStore implementation ----------------------------------------------------
194:
195: public SpyMessage loadFromStorage(MessageReference mh)
196: throws JMSException {
197: if (delegate == null || mh.inMemory())
198: return (SpyMessage) cache.get(mh);
199: else
200: return ((CacheStore) delegate).loadFromStorage(mh);
201: }
202:
203: public void removeFromStorage(MessageReference mh)
204: throws JMSException {
205: if (delegate == null || mh.inMemory()) {
206: cache.remove(mh);
207: mh.setStored(MessageReference.NOT_STORED);
208: } else
209: ((CacheStore) delegate).removeFromStorage(mh);
210:
211: }
212:
213: public void saveToStorage(MessageReference mh, SpyMessage message)
214: throws JMSException {
215: if (delegate == null || mh.inMemory()) {
216: cache.put(mh, message);
217: mh.setStored(MessageReference.STORED);
218: } else
219: ((CacheStore) delegate).saveToStorage(mh, message);
220: }
221:
222: // ServerMBeanSupport overrides -------------------------------------------------
223:
224: protected void startService() throws Exception {
225: //Is there a delegate?
226: if (delegateName != null) {
227: delegate = (org.jboss.mq.pm.PersistenceManager) getServer()
228: .getAttribute(delegateName, "Instance");
229: if ((delegate instanceof CacheStore) == false)
230: throw new UnsupportedOperationException(
231: "The delegate persistence manager must also be a CacheStore");
232: txManager = delegate.getTxManager();
233: } else {
234: txManager = new TxManager(this );
235: }
236: }
237:
238: // Protected --------------------------------------------------------------------
239:
240: // Inner Classes ----------------------------------------------------------------
241: }
|