001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.memory;
031:
032: import java.util.ArrayList;
033: import java.util.logging.*;
034:
035: import javax.jms.*;
036:
037: import com.caucho.jms.connection.*;
038: import com.caucho.jms.message.*;
039: import com.caucho.jms.queue.*;
040:
041: import com.caucho.util.Alarm;
042:
043: /**
044: * Implements a memory queue.
045: */
046: public class MemoryQueue extends AbstractQueue {
047: private static final Logger log = Logger
048: .getLogger(MemoryQueue.class.getName());
049:
050: private ArrayList<MessageImpl> _queueList = new ArrayList<MessageImpl>();
051: // messages waiting for an ack
052: private ArrayList<MessageImpl> _readList = new ArrayList<MessageImpl>();
053:
054: //
055: // JMX configuration
056: //
057:
058: /**
059: * Returns the configuration URL.
060: */
061: public String getUrl() {
062: return "memory:name=" + getName();
063: }
064:
065: //
066: // JMX statistics
067: //
068:
069: /**
070: * Returns the queue size
071: */
072: public int getQueueSize() {
073: synchronized (_queueList) {
074: return _queueList.size();
075: }
076: }
077:
078: /**
079: * Adds the message to the persistent store. Called if there are no
080: * active listeners.
081: */
082: @Override
083: public void send(JmsSession session, MessageImpl msg, long expires) {
084: synchronized (_queueList) {
085: _queueList.add(msg);
086: }
087:
088: notifyMessageAvailable();
089: }
090:
091: /**
092: * Returns true if a message is available.
093: */
094: public boolean hasMessage() {
095: return _queueList.size() > 0;
096: }
097:
098: /**
099: * Polls the next message from the store.
100: */
101: @Override
102: public MessageImpl receive(boolean isAutoAck) {
103: synchronized (_queueList) {
104: if (_queueList.size() == 0)
105: return null;
106:
107: MessageImpl msg = _queueList.remove(0);
108:
109: if (log.isLoggable(Level.FINE))
110: log.fine(this + " receive " + msg
111: + (isAutoAck ? " (auto-ack)" : ""));
112:
113: if (isAutoAck) {
114: return msg;
115: } else {
116: _readList.add(msg);
117: return msg;
118: }
119: }
120: }
121:
122: @Override
123: public ArrayList<MessageImpl> getBrowserList() {
124: synchronized (_queueList) {
125: return new ArrayList<MessageImpl>(_queueList);
126: }
127: }
128:
129: /**
130: * Acknowledges the receipt of a message
131: */
132: @Override
133: public void acknowledge(String msgId) {
134: if (log.isLoggable(Level.FINE))
135: log.fine(this + " acknowledge " + msgId);
136:
137: synchronized (_queueList) {
138: for (int i = _readList.size() - 1; i >= 0; i--) {
139: MessageImpl msg = _readList.get(i);
140:
141: if (msg.getJMSMessageID().equals(msgId))
142: _readList.remove(i);
143: }
144: }
145: }
146:
147: /**
148: * Rolls back the receipt of a message
149: */
150: @Override
151: public void rollback(String msgId) {
152: if (log.isLoggable(Level.FINE))
153: log.fine(this + " rollback " + msgId);
154:
155: synchronized (_queueList) {
156: for (int i = _readList.size() - 1; i >= 0; i--) {
157: MessageImpl msg = _readList.get(i);
158:
159: if (msg.getJMSMessageID().equals(msgId)) {
160: _readList.remove(i);
161: msg.setJMSRedelivered(true);
162: _queueList.add(msg);
163: notifyMessageAvailable();
164: }
165: }
166: }
167: }
168: }
|