001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.jdbc;
031:
032: import com.caucho.config.ConfigException;
033: import com.caucho.jdbc.JdbcMetaData;
034: import com.caucho.jms.queue.AbstractDestination;
035: import com.caucho.util.Alarm;
036: import com.caucho.util.L10N;
037: import com.caucho.util.Log;
038:
039: import javax.annotation.PostConstruct;
040: import javax.sql.DataSource;
041: import java.sql.Connection;
042: import java.sql.PreparedStatement;
043: import java.sql.ResultSet;
044: import java.sql.SQLException;
045: import java.util.logging.Level;
046: import java.util.logging.Logger;
047:
048: /**
049: * Represents a JDBC destination.
050: */
051: abstract public class JdbcDestination extends AbstractDestination {
052: static final Logger log = Logger.getLogger(JdbcDestination.class
053: .getName());
054: static final L10N L = new L10N(JdbcDestination.class);
055:
056: protected JdbcManager _jdbcManager = new JdbcManager();
057:
058: private String _name;
059:
060: private long _lastPurgeTime;
061:
062: public JdbcDestination() {
063: }
064:
065: /**
066: * Sets the name.
067: */
068: public void setName(String name) {
069: _name = name;
070: }
071:
072: /**
073: * Gets the name.
074: */
075: public String getName() {
076: return _name;
077: }
078:
079: /**
080: * Returns true for a topic.
081: */
082: public boolean isTopic() {
083: return false;
084: }
085:
086: /**
087: * Sets the jdbc manager
088: */
089: public void setJdbcManager(JdbcManager jdbcManager) {
090: _jdbcManager = jdbcManager;
091: }
092:
093: /**
094: * Gets the JDBC manager.
095: */
096: public JdbcManager getJdbcManager() {
097: return _jdbcManager;
098: }
099:
100: /**
101: * Sets the data source.
102: */
103: public void setDataSource(DataSource dataSource) {
104: _jdbcManager.setDataSource(dataSource);
105: }
106:
107: /**
108: * Sets the tablespace for Oracle.
109: */
110: public void setTablespace(String tablespace) {
111: _jdbcManager.setTablespace(tablespace);
112: }
113:
114: /**
115: * Initializes the JdbcDestination
116: */
117: @PostConstruct
118: public void init() throws ConfigException, SQLException {
119: _jdbcManager.init();
120: }
121:
122: /**
123: * Creates a queue.
124: */
125: protected int createDestination(String name, boolean isTopic)
126: throws SQLException {
127: Connection conn = _jdbcManager.getDataSource().getConnection();
128: String destinationTable = _jdbcManager.getDestinationTable();
129: String destinationSequence = _jdbcManager
130: .getDestinationSequence();
131:
132: try {
133: String sql = ("SELECT id FROM " + destinationTable + " WHERE name=? AND is_topic=?");
134:
135: PreparedStatement pstmt = conn.prepareStatement(sql);
136: pstmt.setString(1, name);
137: pstmt.setInt(2, isTopic ? 1 : 0);
138:
139: ResultSet rs = pstmt.executeQuery();
140: if (rs.next()) {
141: return rs.getInt(1);
142: }
143: rs.close();
144:
145: if (destinationSequence != null) {
146: JdbcMetaData metaData = _jdbcManager.getMetaData();
147: sql = metaData.selectSequenceSQL(destinationSequence);
148: int id = 0;
149:
150: pstmt = conn.prepareStatement(sql);
151:
152: rs = pstmt.executeQuery();
153: if (rs.next())
154: id = rs.getInt(1);
155: else
156: throw new RuntimeException("can't create sequence");
157:
158: sql = "INSERT INTO " + destinationTable
159: + " (id,name,is_topic) VALUES(?,?,?)";
160:
161: pstmt = conn.prepareStatement(sql);
162:
163: pstmt.setInt(1, id);
164: pstmt.setString(2, name);
165: pstmt.setInt(3, isTopic ? 1 : 0);
166:
167: pstmt.executeUpdate();
168:
169: if (isTopic)
170: log.fine("JMSTopic[" + name + "," + id
171: + "] created");
172: else
173: log.fine("JMSQueue[" + name + "," + id
174: + "] created");
175:
176: return id;
177: } else {
178: sql = "INSERT INTO " + destinationTable
179: + " (name,is_topic) VALUES(?,?)";
180: pstmt = conn.prepareStatement(sql,
181: PreparedStatement.RETURN_GENERATED_KEYS);
182: pstmt.setString(1, name);
183: pstmt.setInt(2, isTopic ? 1 : 0);
184:
185: pstmt.executeUpdate();
186:
187: rs = pstmt.getGeneratedKeys();
188:
189: if (rs.next()) {
190: int id = rs.getInt(1);
191:
192: if (isTopic)
193: log.fine("JMSTopic[" + name + "," + id
194: + "] created");
195: else
196: log.fine("JMSQueue[" + name + "," + id
197: + "] created");
198:
199: return id;
200: } else
201: throw new SQLException(L.l(
202: "can't generate destination for {0}", name));
203: }
204: } finally {
205: conn.close();
206: }
207: }
208:
209: /**
210: * Purges expired messages.
211: */
212: protected void purgeExpiredMessages() {
213: long purgeInterval = _jdbcManager.getPurgeInterval();
214: long now = Alarm.getCurrentTime();
215:
216: if (now < _lastPurgeTime + purgeInterval)
217: return;
218:
219: _lastPurgeTime = now;
220:
221: try {
222: DataSource dataSource = _jdbcManager.getDataSource();
223: String messageTable = _jdbcManager.getMessageTable();
224: JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
225:
226: Connection conn = dataSource.getConnection();
227: try {
228: String sql = ("DELETE FROM " + messageTable + " WHERE expire < ? AND consumer IS NULL");
229:
230: PreparedStatement pstmt = conn.prepareStatement(sql);
231: pstmt.setLong(1, Alarm.getCurrentTime());
232:
233: int count = pstmt.executeUpdate();
234:
235: if (count > 0)
236: log.fine("JMSQueue[" + getName() + "] purged "
237: + count + " expired mesages");
238:
239: pstmt.close();
240: } finally {
241: conn.close();
242: }
243: } catch (Exception e) {
244: log.log(Level.FINER, e.toString(), e);
245: }
246: }
247: }
|