001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.file;
031:
032: import java.io.*;
033: import java.util.logging.*;
034: import javax.sql.*;
035: import java.sql.Connection;
036: import java.sql.Statement;
037: import java.sql.PreparedStatement;
038: import java.sql.SQLException;
039: import java.sql.ResultSet;
040:
041: import javax.jms.*;
042: import javax.annotation.*;
043:
044: import com.caucho.jms.queue.*;
045: import com.caucho.jms.message.*;
046: import com.caucho.config.ConfigException;
047: import com.caucho.db.*;
048: import com.caucho.db.jdbc.*;
049: import com.caucho.util.L10N;
050: import com.caucho.java.*;
051: import com.caucho.server.resin.*;
052: import com.caucho.vfs.*;
053:
054: /**
055: * Implements a file queue.
056: */
057: public class FileQueueStore {
058: private static final L10N L = new L10N(FileQueueStore.class);
059: private static final Logger log = Logger
060: .getLogger(FileQueueStore.class.getName());
061:
062: private static final MessageType[] MESSAGE_TYPE = MessageType
063: .values();
064:
065: private Path _path;
066: private DataSource _db;
067: private String _name = "default";
068: private String _tablePrefix = "jms";
069:
070: private MessageFactory _messageFactory;
071:
072: private String _queueTable;
073: private String _messageTable;
074:
075: private long _queueId;
076:
077: private Connection _conn;
078:
079: private PreparedStatement _sendStmt;
080: private PreparedStatement _receiveStartStmt;
081: private PreparedStatement _readStmt;
082: private PreparedStatement _receiveStmt;
083: private PreparedStatement _deleteStmt;
084:
085: public FileQueueStore(MessageFactory messageFactory) {
086: _messageFactory = messageFactory;
087: }
088:
089: public void setName(String name) {
090: _name = name;
091: }
092:
093: public String getName() {
094: return _name;
095: }
096:
097: /**
098: * Sets the path to the database
099: */
100: public void setPath(Path path) {
101: if (!path.exists()) {
102: try {
103: path.mkdirs();
104: } catch (IOException e) {
105: throw ConfigException.create(e);
106: }
107: }
108:
109: if (!path.isDirectory())
110: throw new ConfigException(L.l(
111: "path '{0}' must be a directory", path));
112:
113: _path = path;
114: }
115:
116: /**
117: * Returns the path to the backing database
118: */
119: public Path getPath() {
120: return _path;
121: }
122:
123: public void setTablePrefix(String prefix) {
124: _tablePrefix = prefix;
125: }
126:
127: @PostConstruct
128: public void init() {
129: if (_path == null)
130: _path = WorkDir.getLocalWorkDir();
131:
132: if (!_path.isDirectory())
133: throw new ConfigException(
134: L
135: .l("FileQueue requires a valid persistent directory."));
136:
137: Resin resin = Resin.getLocal();
138:
139: String serverId = null;
140:
141: if (resin != null)
142: serverId = resin.getServerId();
143:
144: if (serverId == null)
145: serverId = "anon";
146: else if ("".equals(serverId))
147: serverId = "default";
148:
149: _queueTable = escapeName("jms_queue_" + serverId);
150: _messageTable = escapeName("jms_message_" + serverId);
151:
152: try {
153: DataSourceImpl db = new DataSourceImpl(_path);
154: db.setRemoveOnError(true);
155: db.init();
156:
157: _db = db;
158:
159: _conn = _db.getConnection();
160:
161: initDatabase();
162:
163: initQueue();
164:
165: initStatements();
166: } catch (SQLException e) {
167: throw ConfigException.create(e);
168: }
169: }
170:
171: /**
172: * Adds a new message to the persistent store.
173: */
174: public long send(MessageImpl msg, long expireTime) {
175: synchronized (this ) {
176: try {
177: _sendStmt.setLong(1, _queueId);
178: _sendStmt.setLong(2, expireTime);
179: _sendStmt.setString(3, msg.getJMSMessageID());
180: _sendStmt.setBinaryStream(4, msg
181: .propertiesToInputStream(), 0);
182: _sendStmt.setInt(5, msg.getType().ordinal());
183: _sendStmt
184: .setBinaryStream(6, msg.bodyToInputStream(), 0);
185:
186: _sendStmt.executeUpdate();
187:
188: ResultSet rs = _sendStmt.getGeneratedKeys();
189:
190: if (!rs.next())
191: throw new java.lang.IllegalStateException();
192:
193: long id = rs.getLong(1);
194:
195: rs.close();
196:
197: return id;
198: } catch (Exception e) {
199: throw new RuntimeException(e);
200: }
201: }
202: }
203:
204: /**
205: * Retrieves a message from the persistent store.
206: */
207: void receiveStart(FileQueue fileQueue) {
208: synchronized (this ) {
209: try {
210: _receiveStartStmt.setLong(1, _queueId);
211:
212: ResultSet rs = _receiveStartStmt.executeQuery();
213:
214: while (rs.next()) {
215: long id = rs.getLong(1);
216: long expire = rs.getLong(2);
217: MessageType type = MESSAGE_TYPE[rs.getInt(3)];
218:
219: FileQueueEntry entry = fileQueue.addEntry(id,
220: expire, type);
221: }
222:
223: rs.close();
224: } catch (Exception e) {
225: throw new RuntimeException(e);
226: }
227: }
228: }
229:
230: /**
231: * Retrieves a message from the persistent store.
232: */
233: public MessageImpl readMessage(long id, MessageType type) {
234: synchronized (this ) {
235: try {
236: _readStmt.setLong(1, id);
237:
238: ResultSet rs = _readStmt.executeQuery();
239:
240: if (rs.next()) {
241: MessageImpl msg;
242:
243: type = MESSAGE_TYPE[rs.getInt(1)];
244:
245: switch (type) {
246: case NULL:
247: msg = new MessageImpl();
248: break;
249: case BYTES:
250: msg = new BytesMessageImpl();
251: break;
252: case MAP:
253: msg = new MapMessageImpl();
254: break;
255: case OBJECT:
256: msg = new ObjectMessageImpl();
257: break;
258: case STREAM:
259: msg = new StreamMessageImpl();
260: break;
261: case TEXT:
262: msg = new TextMessageImpl();
263: break;
264: default:
265: msg = new MessageImpl();
266: break;
267: }
268:
269: String msgId = rs.getString(2);
270:
271: msg.setJMSMessageID(msgId);
272:
273: InputStream is = rs.getBinaryStream(3);
274: if (is != null) {
275: msg.readProperties(is);
276:
277: is.close();
278: }
279:
280: is = rs.getBinaryStream(4);
281: if (is != null) {
282: msg.readBody(is);
283:
284: is.close();
285: }
286:
287: return (MessageImpl) msg;
288: }
289:
290: rs.close();
291: } catch (Exception e) {
292: throw new RuntimeException(e);
293: }
294: }
295:
296: return null;
297: }
298:
299: /**
300: * Retrieves a message from the persistent store.
301: */
302: public MessageImpl receive() {
303: synchronized (this ) {
304: try {
305: _receiveStmt.setLong(1, _queueId);
306:
307: ResultSet rs = _receiveStmt.executeQuery();
308:
309: if (rs.next()) {
310: long id = rs.getLong(1);
311:
312: rs.close();
313:
314: Message msg = _messageFactory
315: .createTextMessage("sample");
316:
317: _deleteStmt.setLong(1, id);
318:
319: _deleteStmt.executeUpdate();
320:
321: return (MessageImpl) msg;
322: }
323:
324: rs.close();
325: } catch (Exception e) {
326: throw new RuntimeException(e);
327: }
328: }
329:
330: return null;
331: }
332:
333: /**
334: * Retrieves a message from the persistent store.
335: */
336: void delete(long id) {
337: synchronized (this ) {
338: try {
339: _deleteStmt.setLong(1, id);
340:
341: _deleteStmt.executeUpdate();
342: } catch (Exception e) {
343: throw new RuntimeException(e);
344: }
345: }
346: }
347:
348: private void initDatabase() throws SQLException {
349: String sql = "select id from " + _queueTable + " where 1=0";
350: Statement stmt = _conn.createStatement();
351:
352: try {
353: ResultSet rs = stmt.executeQuery(sql);
354:
355: rs.close();
356:
357: return;
358: } catch (SQLException e) {
359: log.finer(e.toString());
360: }
361:
362: sql = ("create table " + _queueTable + " ("
363: + " id bigint auto_increment," + " name varchar(128)" + ")");
364:
365: stmt.executeUpdate(sql);
366:
367: sql = ("create table " + _messageTable + " ("
368: + " id bigint auto_increment," + " queue bigint,"
369: + " expire datetime," + " refcount integer,"
370: + " owner bigint," + " msg_id varchar(64),"
371: + " header blob," + " type integer," + " body blob" + ")");
372:
373: stmt.executeUpdate(sql);
374: }
375:
376: private void initQueue() throws SQLException {
377: String sql = "select id from " + _queueTable + " where name=?";
378:
379: PreparedStatement stmt = _conn.prepareStatement(sql);
380: stmt.setString(1, getName());
381:
382: ResultSet rs = stmt.executeQuery();
383: if (rs.next()) {
384: _queueId = rs.getLong(1);
385: rs.close();
386: stmt.close();
387:
388: return;
389: }
390:
391: stmt.close();
392:
393: sql = "insert into " + _queueTable + " (name) values(?)";
394: stmt = _conn.prepareStatement(sql,
395: Statement.RETURN_GENERATED_KEYS);
396:
397: stmt.setString(1, getName());
398:
399: stmt.executeUpdate();
400:
401: rs = stmt.getGeneratedKeys();
402:
403: if (!rs.next())
404: throw new java.lang.IllegalStateException();
405:
406: _queueId = rs.getLong(1);
407:
408: rs.close();
409: stmt.close();
410: }
411:
412: private void initStatements() throws SQLException {
413: String sql = ("insert into " + _messageTable + " (queue,expire,msg_id,header,type,body) VALUES(?,?,?,?,?,?)");
414:
415: _sendStmt = _conn.prepareStatement(sql,
416: Statement.RETURN_GENERATED_KEYS);
417:
418: sql = ("select id,msg_id,header,body from " + _messageTable + " WHERE queue=? LIMIT 1");
419:
420: _receiveStmt = _conn.prepareStatement(sql);
421:
422: sql = ("select type,msg_id,header,body from " + _messageTable + " WHERE id=?");
423:
424: _readStmt = _conn.prepareStatement(sql);
425:
426: sql = ("select id,expire,type from " + _messageTable + " WHERE queue=? ORDER BY id");
427:
428: _receiveStartStmt = _conn.prepareStatement(sql);
429:
430: sql = ("delete from " + _messageTable + " WHERE id=?");
431:
432: _deleteStmt = _conn.prepareStatement(sql);
433: }
434:
435: private static String escapeName(String name) {
436: StringBuilder sb = new StringBuilder();
437:
438: for (int i = 0; i < name.length(); i++) {
439: char ch = name.charAt(i);
440:
441: if ('a' <= ch && ch <= 'z' || 'A' <= ch && ch <= 'Z'
442: || '0' <= ch && ch <= '0' || ch == '_') {
443: sb.append(ch);
444: } else
445: sb.append('_');
446: }
447:
448: return sb.toString();
449: }
450: }
|