001: /*
002: * $Id: TransactionalQueueManager.java 10380 2008-01-18 10:08:25Z akuzmin $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.util.queue;
012:
013: import org.mule.util.queue.QueuePersistenceStrategy.Holder;
014: import org.mule.util.xa.AbstractTransactionContext;
015: import org.mule.util.xa.AbstractXAResourceManager;
016: import org.mule.util.xa.ResourceManagerException;
017: import org.mule.util.xa.ResourceManagerSystemException;
018:
019: import java.io.IOException;
020: import java.util.ArrayList;
021: import java.util.HashMap;
022: import java.util.Iterator;
023: import java.util.LinkedList;
024: import java.util.List;
025: import java.util.Map;
026:
027: import javax.transaction.xa.XAResource;
028:
029: import org.apache.commons.logging.Log;
030: import org.apache.commons.logging.LogFactory;
031:
032: /**
033: * The Transactional Queue Manager is responsible for creating and Managing
034: * transactional Queues. Queues can also be persistent by setting a persistence
035: * strategy on the manager. Default straties are provided for Memory, Jounaling,
036: * Cache and File.
037: */
038: public class TransactionalQueueManager extends
039: AbstractXAResourceManager implements QueueManager {
040:
041: private static Log logger = LogFactory
042: .getLog(TransactionalQueueManager.class);
043:
044: private Map queues = new HashMap();
045:
046: private QueuePersistenceStrategy memoryPersistenceStrategy = new MemoryPersistenceStrategy();
047: private QueuePersistenceStrategy persistenceStrategy;
048:
049: private QueueConfiguration defaultQueueConfiguration = new QueueConfiguration(
050: false);
051:
052: public synchronized QueueSession getQueueSession() {
053: return new TransactionalQueueSession(this , this );
054: }
055:
056: public synchronized void setDefaultQueueConfiguration(
057: QueueConfiguration config) {
058: this .defaultQueueConfiguration = config;
059: }
060:
061: public synchronized void setQueueConfiguration(String queueName,
062: QueueConfiguration config) {
063: getQueue(queueName).config = config;
064: }
065:
066: protected synchronized QueueInfo getQueue(String name) {
067: QueueInfo q = (QueueInfo) queues.get(name);
068: if (q == null) {
069: q = new QueueInfo();
070: q.name = name;
071: q.list = new LinkedList();
072: q.config = defaultQueueConfiguration;
073: queues.put(name, q);
074: }
075: return q;
076: }
077:
078: /*
079: * (non-Javadoc)
080: *
081: * @see org.mule.transaction.xa.AbstractResourceManager#getLogger()
082: */
083: protected Log getLogger() {
084: return logger;
085: }
086:
087: public void close() {
088: try {
089: stop(SHUTDOWN_MODE_NORMAL);
090: } catch (ResourceManagerException e) {
091: // TODO MULE-863: What should we really do?
092: logger.error("Error disposing manager", e);
093: }
094: }
095:
096: protected void doStart() throws ResourceManagerSystemException {
097: if (persistenceStrategy != null) {
098: try {
099: persistenceStrategy.open();
100: } catch (IOException e) {
101: throw new ResourceManagerSystemException(e);
102: }
103: }
104: }
105:
106: protected boolean shutdown(int mode, long timeoutMSecs) {
107: try {
108: if (persistenceStrategy != null) {
109: persistenceStrategy.close();
110: }
111: } catch (IOException e) {
112: // TODO MULE-863: What should we really do?
113: logger.error("Error closing persistent store", e);
114: }
115: return super .shutdown(mode, timeoutMSecs);
116: }
117:
118: protected void recover() throws ResourceManagerSystemException {
119: if (persistenceStrategy != null) {
120: try {
121: List msgs = persistenceStrategy.restore();
122: for (Iterator it = msgs.iterator(); it.hasNext();) {
123: Holder h = (Holder) it.next();
124: getQueue(h.getQueue()).putNow(h.getId());
125: }
126: } catch (Exception e) {
127: throw new ResourceManagerSystemException(e);
128: }
129: }
130: }
131:
132: /*
133: * (non-Javadoc)
134: *
135: * @see org.mule.transaction.xa.AbstractResourceManager#createTransactionContext()
136: */
137: protected AbstractTransactionContext createTransactionContext(
138: Object session) {
139: return new QueueTransactionContext();
140: }
141:
142: /*
143: * (non-Javadoc)
144: *
145: * @see org.mule.transaction.xa.AbstractResourceManager#doBegin(org.mule.transaction.xa.AbstractTransactionContext)
146: */
147: protected void doBegin(AbstractTransactionContext context) {
148: // Nothing special to do
149: }
150:
151: /*
152: * (non-Javadoc)
153: *
154: * @see org.mule.transaction.xa.AbstractResourceManager#doPrepare(org.mule.transaction.xa.AbstractTransactionContext)
155: */
156: protected int doPrepare(AbstractTransactionContext context) {
157: return XAResource.XA_OK;
158: }
159:
160: /*
161: * (non-Javadoc)
162: *
163: * @see org.mule.transaction.xa.AbstractResourceManager#doCommit(org.mule.transaction.xa.AbstractTransactionContext)
164: */
165: protected void doCommit(AbstractTransactionContext context)
166: throws ResourceManagerException {
167: QueueTransactionContext ctx = (QueueTransactionContext) context;
168: try {
169: if (ctx.added != null) {
170: for (Iterator it = ctx.added.entrySet().iterator(); it
171: .hasNext();) {
172: Map.Entry entry = (Map.Entry) it.next();
173: QueueInfo queue = (QueueInfo) entry.getKey();
174: List queueAdded = (List) entry.getValue();
175: if (queueAdded != null && queueAdded.size() > 0) {
176: for (Iterator itAdded = queueAdded.iterator(); itAdded
177: .hasNext();) {
178: Object object = itAdded.next();
179: Object id = doStore(queue, object);
180: queue.putNow(id);
181: }
182: }
183: }
184: }
185: if (ctx.removed != null) {
186: for (Iterator it = ctx.removed.entrySet().iterator(); it
187: .hasNext();) {
188: Map.Entry entry = (Map.Entry) it.next();
189: QueueInfo queue = (QueueInfo) entry.getKey();
190: List queueRemoved = (List) entry.getValue();
191: if (queueRemoved != null && queueRemoved.size() > 0) {
192: for (Iterator itRemoved = queueRemoved
193: .iterator(); itRemoved.hasNext();) {
194: Object id = itRemoved.next();
195: doRemove(queue, id);
196: }
197: }
198: }
199: }
200: } catch (Exception e) {
201: // throw new ResourceManagerException("Could not commit
202: // transaction", e);
203: // TODO: add an i18n Message
204: throw new ResourceManagerException(e);
205: } finally {
206: ctx.added = null;
207: ctx.removed = null;
208: }
209: }
210:
211: protected Object doStore(QueueInfo queue, Object object)
212: throws IOException {
213: QueuePersistenceStrategy ps = (queue.config.persistent) ? persistenceStrategy
214: : memoryPersistenceStrategy;
215: Object id = ps.store(queue.name, object);
216: return id;
217: }
218:
219: protected void doRemove(QueueInfo queue, Object id)
220: throws IOException {
221: QueuePersistenceStrategy ps = (queue.config.persistent) ? persistenceStrategy
222: : memoryPersistenceStrategy;
223: ps.remove(queue.name, id);
224: }
225:
226: protected Object doLoad(QueueInfo queue, Object id)
227: throws IOException {
228: QueuePersistenceStrategy ps = (queue.config.persistent) ? persistenceStrategy
229: : memoryPersistenceStrategy;
230: Object obj = ps.load(queue.name, id);
231: return obj;
232: }
233:
234: /*
235: * (non-Javadoc)
236: *
237: * @see org.mule.transaction.xa.AbstractResourceManager#doRollback(org.mule.transaction.xa.AbstractTransactionContext)
238: */
239: protected void doRollback(AbstractTransactionContext context)
240: throws ResourceManagerException {
241: QueueTransactionContext ctx = (QueueTransactionContext) context;
242: if (ctx.removed != null) {
243: for (Iterator it = ctx.removed.entrySet().iterator(); it
244: .hasNext();) {
245: Map.Entry entry = (Map.Entry) it.next();
246: QueueInfo queue = (QueueInfo) entry.getKey();
247: List queueRemoved = (List) entry.getValue();
248: if (queueRemoved != null && queueRemoved.size() > 0) {
249: for (Iterator itRemoved = queueRemoved.iterator(); itRemoved
250: .hasNext();) {
251: Object id = itRemoved.next();
252: queue.putNow(id);
253: }
254: }
255: }
256: }
257: ctx.added = null;
258: ctx.removed = null;
259: }
260:
261: protected class QueueTransactionContext extends
262: AbstractTransactionContext {
263: protected Map added;
264: protected Map removed;
265:
266: public boolean offer(QueueInfo queue, Object item, long timeout)
267: throws InterruptedException {
268: readOnly = false;
269: if (added == null) {
270: added = new HashMap();
271: }
272: List queueAdded = (List) added.get(queue);
273: if (queueAdded == null) {
274: queueAdded = new ArrayList();
275: added.put(queue, queueAdded);
276: }
277: // wait for enough room
278: if (queue.offer(null, queueAdded.size(), timeout)) {
279: queueAdded.add(item);
280: return true;
281: } else {
282: return false;
283: }
284: }
285:
286: public Object poll(QueueInfo queue, long timeout)
287: throws IOException, InterruptedException {
288: readOnly = false;
289: if (added != null) {
290: List queueAdded = (List) added.get(queue);
291: if (queueAdded != null) {
292: return queueAdded.remove(queueAdded.size() - 1);
293: }
294: }
295: Object o = queue.poll(timeout);
296: if (o != null) {
297: if (removed == null) {
298: removed = new HashMap();
299: }
300: List queueRemoved = (List) removed.get(queue);
301: if (queueRemoved == null) {
302: queueRemoved = new ArrayList();
303: removed.put(queue, queueRemoved);
304: }
305: queueRemoved.add(o);
306: o = doLoad(queue, o);
307: }
308: return o;
309: }
310:
311: public Object peek(QueueInfo queue) throws IOException,
312: InterruptedException {
313: readOnly = false;
314: if (added != null) {
315: List queueAdded = (List) added.get(queue);
316: if (queueAdded != null) {
317: return queueAdded.get(queueAdded.size() - 1);
318: }
319: }
320: Object o = queue.peek();
321: if (o != null) {
322: o = doLoad(queue, o);
323: }
324: return o;
325: }
326:
327: public int size(QueueInfo queue) {
328: int sz = queue.list.size();
329: if (added != null) {
330: List queueAdded = (List) added.get(queue);
331: if (queueAdded != null) {
332: sz += queueAdded.size();
333: }
334: }
335: return sz;
336: }
337:
338: }
339:
340: /**
341: * @return Returns the persistenceStrategy.
342: */
343: public QueuePersistenceStrategy getPersistenceStrategy() {
344: return persistenceStrategy;
345: }
346:
347: /**
348: * @param persistenceStrategy The persistenceStrategy to set.
349: */
350: public void setPersistenceStrategy(
351: QueuePersistenceStrategy persistenceStrategy) {
352: if (operationMode != OPERATION_MODE_STOPPED) {
353: throw new IllegalStateException();
354: }
355: this .persistenceStrategy = persistenceStrategy;
356: }
357:
358: public QueuePersistenceStrategy getMemoryPersistenceStrategy() {
359: return memoryPersistenceStrategy;
360: }
361:
362: public void setMemoryPersistenceStrategy(
363: QueuePersistenceStrategy memoryPersistenceStrategy) {
364: if (operationMode != OPERATION_MODE_STOPPED) {
365: throw new IllegalStateException();
366: }
367: this.memoryPersistenceStrategy = memoryPersistenceStrategy;
368: }
369: }
|