001: /*
002: * $Id: TransactionalQueueSession.java 8077 2007-08-27 20:15:25Z aperepel $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.util.queue;
012:
013: import org.mule.util.xa.AbstractXAResourceManager;
014: import org.mule.util.xa.DefaultXASession;
015:
016: import java.io.IOException;
017:
018: /**
019: * A Queue session that is used to manage the transaction context of a Queue
020: */
021: class TransactionalQueueSession extends DefaultXASession implements
022: QueueSession {
023:
024: protected TransactionalQueueManager queueManager;
025:
026: public TransactionalQueueSession(
027: AbstractXAResourceManager resourceManager,
028: TransactionalQueueManager queueManager) {
029: super (resourceManager);
030: this .queueManager = queueManager;
031: }
032:
033: /*
034: * (non-Javadoc)
035: *
036: * @see org.mule.transaction.xa.queue.QueueSession#getQueue(java.lang.String)
037: */
038: public Queue getQueue(String name) {
039: QueueInfo queue = queueManager.getQueue(name);
040: return new QueueImpl(queue);
041: }
042:
043: protected class QueueImpl implements Queue {
044:
045: protected QueueInfo queue;
046:
047: public QueueImpl(QueueInfo queue) {
048: this .queue = queue;
049: }
050:
051: public void put(Object item) throws InterruptedException {
052: offer(item, Long.MAX_VALUE);
053: }
054:
055: public boolean offer(Object item, long timeout)
056: throws InterruptedException {
057: if (localContext != null) {
058: return ((TransactionalQueueManager.QueueTransactionContext) localContext)
059: .offer(queue, item, timeout);
060: } else {
061: try {
062: Object id = queueManager.doStore(queue, item);
063: try {
064: if (!queue.offer(id, 0, timeout)) {
065: queueManager.doRemove(queue, item);
066: return false;
067: } else {
068: return true;
069: }
070: } catch (InterruptedException e) {
071: queueManager.doRemove(queue, item);
072: throw e;
073: }
074: } catch (IOException e) {
075: throw new RuntimeException(e);
076: }
077: }
078: }
079:
080: public Object take() throws InterruptedException {
081: return poll(Long.MAX_VALUE);
082: }
083:
084: public Object poll(long timeout) throws InterruptedException {
085: try {
086: if (localContext != null) {
087: return ((TransactionalQueueManager.QueueTransactionContext) localContext)
088: .poll(queue, timeout);
089: } else {
090: Object id = queue.poll(timeout);
091: if (id != null) {
092: Object item = queueManager.doLoad(queue, id);
093: queueManager.doRemove(queue, id);
094: return item;
095: }
096: return null;
097: }
098: } catch (IOException e) {
099: throw new RuntimeException(e);
100: }
101: }
102:
103: public Object peek() throws InterruptedException {
104: try {
105: if (localContext != null) {
106: return ((TransactionalQueueManager.QueueTransactionContext) localContext)
107: .peek(queue);
108: } else {
109: Object id = queue.peek();
110: if (id != null) {
111: Object item = queueManager.doLoad(queue, id);
112: queueManager.doRemove(queue, id);
113: return item;
114: }
115: return null;
116: }
117: } catch (IOException e) {
118: throw new RuntimeException(e);
119: }
120: }
121:
122: public int size() {
123: if (localContext != null) {
124: return ((TransactionalQueueManager.QueueTransactionContext) localContext)
125: .size(queue);
126: } else {
127: return queue.list.size();
128: }
129: }
130:
131: }
132: }
|