001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.file;
031:
032: import java.io.*;
033: import java.util.logging.*;
034: import java.sql.*;
035:
036: import javax.jms.*;
037: import javax.annotation.*;
038: import javax.webbeans.*;
039:
040: import com.caucho.jms.queue.*;
041: import com.caucho.jms.message.*;
042: import com.caucho.jms.connection.*;
043: import com.caucho.config.ConfigException;
044: import com.caucho.db.*;
045: import com.caucho.util.L10N;
046: import com.caucho.java.*;
047: import com.caucho.vfs.*;
048:
049: /**
050: * A JMS queue backed by a file-based database.
051: *
052: * The URL looks like
053: * <pre>
054: * file:name=my-name;path=file:/var/www/webapps/test/WEB-INFjms
055: * </pre>
056: *
057: * It is configured as:
058: * <pre>
059: * <jms-queue jndi-name="jms/my-name" uri="file:path=WEB-INF/jms"/>
060: * </pre>
061: */
062: public class FileQueue extends AbstractQueue implements Topic {
063: private static final L10N L = new L10N(FileQueue.class);
064: private static final Logger log = Logger.getLogger(FileQueue.class
065: .getName());
066:
067: private final FileQueueStore _store;
068:
069: private final Object _queueLock = new Object();
070:
071: private FileQueueEntry _head;
072: private FileQueueEntry _tail;
073:
074: @In
075: public FileQueue() {
076: _store = new FileQueueStore(_messageFactory);
077: }
078:
079: public FileQueue(String name) {
080: this ();
081:
082: setName(name);
083:
084: init();
085: }
086:
087: //
088: // Configuration
089: //
090:
091: /**
092: * Sets the path to the backing database
093: */
094: public void setPath(Path path) {
095: _store.setPath(path);
096: }
097:
098: public Path getPath() {
099: return _store.getPath();
100: }
101:
102: public void setTablePrefix(String prefix) {
103: _store.setTablePrefix(prefix);
104: }
105:
106: //
107: // JMX configuration items
108: //
109:
110: /**
111: * Returns the JMS configuration url.
112: */
113: public String getUrl() {
114: return "file:name=" + getName() + ";path="
115: + _store.getPath().getURL();
116: }
117:
118: //
119: // JMX stats
120: //
121:
122: public int getQueueSize() {
123: synchronized (_queueLock) {
124: int count = 0;
125:
126: for (FileQueueEntry entry = _head; entry != null; entry = entry._next) {
127: count++;
128: }
129:
130: return count;
131: }
132: }
133:
134: /**
135: * Initialize the queue
136: */
137: public void init() {
138: _store.setName(getName());
139:
140: _store.init();
141:
142: _store.receiveStart(this );
143: }
144:
145: /**
146: * Adds a message entry from startup.
147: */
148: FileQueueEntry addEntry(long id, long expire, MessageType type) {
149: synchronized (_queueLock) {
150: FileQueueEntry entry = new FileQueueEntry(id, expire, type);
151:
152: entry._prev = _tail;
153:
154: if (_tail != null)
155: _tail._next = entry;
156: else
157: _head = entry;
158:
159: _tail = entry;
160:
161: return entry;
162: }
163: }
164:
165: /**
166: * Adds the message to the persistent store. Called if there are no
167: * active listeners.
168: *
169: * @param msg the message to store
170: * @param expires the expires time
171: */
172: @Override
173: public void send(JmsSession session, MessageImpl msg, long expires) {
174: synchronized (_queueLock) {
175: long id = _store.send(msg, expires);
176:
177: FileQueueEntry entry = addEntry(id, expires, null);
178:
179: entry.setMessage(msg);
180: }
181:
182: notifyMessageAvailable();
183: }
184:
185: /**
186: * Polls the next message from the store. If no message is available,
187: * wait for the timeout.
188: */
189: @Override
190: public MessageImpl receive(boolean isAutoAck) {
191: synchronized (_queueLock) {
192: for (FileQueueEntry entry = _head; entry != null; entry = entry._next) {
193: if (!entry.isRead()) {
194: entry.setRead(true);
195:
196: MessageImpl msg = entry.getMessage();
197:
198: if (msg == null) {
199: msg = _store.readMessage(entry.getId(), entry
200: .getType());
201: entry.setMessage(msg);
202: }
203:
204: if (isAutoAck) {
205: removeEntry(entry);
206: _store.delete(entry.getId());
207: }
208:
209: return msg;
210: }
211: }
212:
213: return null;
214: }
215: }
216:
217: /**
218: * Rollsback the message from the store.
219: */
220: @Override
221: public void rollback(String msgId) {
222: synchronized (_queueLock) {
223: for (FileQueueEntry entry = _head; entry != null; entry = entry._next) {
224: MessageImpl msg = entry.getMessage();
225:
226: if (msg != null && msgId.equals(msg.getJMSMessageID())
227: && entry.isRead()) {
228: entry.setRead(false);
229: msg.setJMSRedelivered(true);
230: return;
231: }
232: }
233: }
234: }
235:
236: /**
237: * Rollsback the message from the store.
238: */
239: @Override
240: public void acknowledge(String msgId) {
241: synchronized (_queueLock) {
242: for (FileQueueEntry entry = _head; entry != null; entry = entry._next) {
243: if (entry.getMessage().getJMSMessageID().equals(msgId)
244: && entry.isRead()) {
245: removeEntry(entry);
246: _store.delete(entry.getId());
247: return;
248: }
249: }
250: }
251: }
252:
253: private void removeEntry(FileQueueEntry entry) {
254: FileQueueEntry prev = entry._prev;
255: FileQueueEntry next = entry._next;
256:
257: if (prev != null)
258: prev._next = next;
259: else
260: _head = next;
261:
262: if (next != null)
263: next._prev = prev;
264: else
265: _tail = prev;
266: }
267: }
|