001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.jms.jdbc;
031:
032: import com.caucho.config.ConfigException;
033: import com.caucho.jms.JMSExceptionWrapper;
034: import com.caucho.jms.connection.MessageConsumerImpl;
035: import com.caucho.jms.connection.JmsSession;
036: import com.caucho.jms.message.MessageImpl;
037: import com.caucho.jms.queue.AbstractQueue;
038: import com.caucho.log.Log;
039: import com.caucho.util.L10N;
040: import com.caucho.util.Alarm;
041: import com.caucho.jdbc.*;
042:
043: import javax.annotation.PostConstruct;
044: import javax.jms.JMSException;
045: import javax.jms.Message;
046: import javax.jms.Queue;
047: import javax.jms.QueueBrowser;
048: import java.sql.*;
049: import java.io.*;
050: import javax.sql.*;
051: import java.util.logging.*;
052:
053: /**
054: * A jdbc queue.
055: */
056: public class JdbcQueue extends AbstractQueue {
057: static final Logger log = Logger.getLogger(JdbcQueue.class
058: .getName());
059: static final L10N L = new L10N(JdbcQueue.class);
060:
061: protected JdbcManager _jdbcManager = new JdbcManager();
062:
063: private String _name;
064:
065: private int _id;
066: private int _consumerId;
067:
068: public JdbcQueue() {
069: }
070:
071: /**
072: * Sets the name.
073: */
074: public void setName(String name) {
075: _name = name;
076: }
077:
078: /**
079: * Gets the name.
080: */
081: public String getName() {
082: return _name;
083: }
084:
085: /**
086: * Returns the queue's name.
087: */
088: public String getQueueName() {
089: return getName();
090: }
091:
092: /**
093: * Sets the queue's name.
094: */
095: public void setQueueName(String name) {
096: setName(name);
097: }
098:
099: /**
100: * Returns the JDBC id for the queue.
101: */
102: public int getId() {
103: return _id;
104: }
105:
106: /**
107: * Sets the jdbc manager
108: */
109: public void setJdbcManager(JdbcManager jdbcManager) {
110: _jdbcManager = jdbcManager;
111: }
112:
113: /**
114: * Gets the JDBC manager.
115: */
116: public JdbcManager getJdbcManager() {
117: return _jdbcManager;
118: }
119:
120: /**
121: * Sets the data source.
122: */
123: public void setDataSource(DataSource dataSource) {
124: _jdbcManager.setDataSource(dataSource);
125: }
126:
127: /**
128: * Sets the tablespace for Oracle.
129: */
130: public void setTablespace(String tablespace) {
131: _jdbcManager.setTablespace(tablespace);
132: }
133:
134: /**
135: * Initializes the JdbcQueue
136: */
137: public void init() throws ConfigException {
138: try {
139: if (_jdbcManager.getDataSource() == null)
140: throw new ConfigException(
141: L
142: .l("JdbcQueue requires a <data-source> element."));
143:
144: if (getName() == null)
145: throw new ConfigException(
146: L
147: .l("JdbcQueue requires a <queue-name> element."));
148:
149: _jdbcManager.init();
150:
151: _id = createDestination(getName(), false);
152: } catch (RuntimeException e) {
153: throw e;
154: } catch (Exception e) {
155: throw ConfigException.create(e);
156: }
157: }
158:
159: /**
160: * Creates a consumer.
161: */
162: /*
163: public MessageConsumerImpl createConsumer(JmsSession session,
164: String selector,
165: boolean noWait)
166: throws JMSException
167: {
168: return new JdbcQueueConsumer(session, selector, _jdbcManager, this);
169: }
170: */
171:
172: /**
173: * Creates a browser.
174: */
175: /*
176: public QueueBrowser createBrowser(SessionImpl session, String selector)
177: throws JMSException
178: {
179: return new JdbcQueueBrowser(session, selector, this);
180: }
181: */
182:
183: /**
184: * Sends the message to the queue.
185: */
186: @Override
187: public void send(JmsSession session, MessageImpl message,
188: long expireTime) throws JMSException {
189: try {
190: JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
191: jdbcMessage.send(message, _id, expireTime);
192: } catch (Exception e) {
193: throw new JMSExceptionWrapper(e);
194: }
195: }
196:
197: /**
198: * Receives a message from the queue.
199: */
200: @Override
201: public MessageImpl receive(boolean isAutoAck) throws JMSException {
202: try {
203: long minId = -1;
204:
205: DataSource dataSource = _jdbcManager.getDataSource();
206: String messageTable = _jdbcManager.getMessageTable();
207: JdbcMessage jdbcMessage = _jdbcManager.getJdbcMessage();
208:
209: Connection conn = dataSource.getConnection();
210: try {
211: String sql = ("SELECT m_id, msg_type, delivered, body, header"
212: + " FROM "
213: + messageTable
214: + " WHERE ?<m_id AND queue=?"
215: + " AND consumer IS NULL AND ?<=expire" + " ORDER BY m_id");
216:
217: PreparedStatement selectStmt = conn
218: .prepareStatement(sql);
219:
220: try {
221: selectStmt.setFetchSize(1);
222: } catch (Throwable e) {
223: log.log(Level.FINER, e.toString(), e);
224: }
225:
226: if (isAutoAck) {
227: sql = ("DELETE FROM " + messageTable + " WHERE m_id=? AND consumer IS NULL");
228: } else
229: sql = ("UPDATE " + messageTable
230: + " SET consumer=?, delivered=1" + " WHERE m_id=? AND consumer IS NULL");
231:
232: PreparedStatement updateStmt = conn
233: .prepareStatement(sql);
234:
235: long id = -1;
236: while (true) {
237: id = -1;
238:
239: selectStmt.setLong(1, minId);
240: selectStmt.setInt(2, getId());
241: selectStmt.setLong(3, Alarm.getCurrentTime());
242:
243: MessageImpl msg = null;
244:
245: ResultSet rs = selectStmt.executeQuery();
246: while (rs.next()) {
247: id = rs.getLong(1);
248:
249: minId = id;
250:
251: msg = jdbcMessage.readMessage(rs);
252:
253: /*
254: if (_selector == null || _selector.isMatch(msg))
255: break;
256: else
257: msg = null;
258: */
259: if (true)
260: break;
261: }
262:
263: rs.close();
264:
265: if (msg == null)
266: return null;
267:
268: if (isAutoAck) {
269: updateStmt.setLong(1, id);
270: } else {
271: updateStmt.setLong(1, _consumerId);
272: updateStmt.setLong(2, id);
273: }
274:
275: int updateCount = updateStmt.executeUpdate();
276:
277: if (updateCount == 1)
278: return msg;
279: }
280: } finally {
281: conn.close();
282: }
283: } catch (IOException e) {
284: throw new JMSExceptionWrapper(e);
285: } catch (SQLException e) {
286: throw new JMSExceptionWrapper(e);
287: }
288: }
289:
290: /**
291: * Removes the first message matching the selector.
292: */
293: public void commit(int session) throws JMSException {
294: }
295:
296: /**
297: * Creates a queue.
298: */
299: protected int createDestination(String name, boolean isTopic)
300: throws SQLException {
301: Connection conn = _jdbcManager.getDataSource().getConnection();
302: String destinationTable = _jdbcManager.getDestinationTable();
303: String destinationSequence = _jdbcManager
304: .getDestinationSequence();
305:
306: try {
307: String sql = ("SELECT id FROM " + destinationTable + " WHERE name=? AND is_topic=?");
308:
309: PreparedStatement pstmt = conn.prepareStatement(sql);
310: pstmt.setString(1, name);
311: pstmt.setInt(2, isTopic ? 1 : 0);
312:
313: ResultSet rs = pstmt.executeQuery();
314: if (rs.next()) {
315: return rs.getInt(1);
316: }
317: rs.close();
318:
319: if (destinationSequence != null) {
320: JdbcMetaData metaData = _jdbcManager.getMetaData();
321: sql = metaData.selectSequenceSQL(destinationSequence);
322: int id = 0;
323:
324: pstmt = conn.prepareStatement(sql);
325:
326: rs = pstmt.executeQuery();
327: if (rs.next())
328: id = rs.getInt(1);
329: else
330: throw new RuntimeException("can't create sequence");
331:
332: sql = "INSERT INTO " + destinationTable
333: + " (id,name,is_topic) VALUES(?,?,?)";
334:
335: pstmt = conn.prepareStatement(sql);
336:
337: pstmt.setInt(1, id);
338: pstmt.setString(2, name);
339: pstmt.setInt(3, isTopic ? 1 : 0);
340:
341: pstmt.executeUpdate();
342:
343: if (isTopic)
344: log.fine("JMSTopic[" + name + "," + id
345: + "] created");
346: else
347: log.fine("JMSQueue[" + name + "," + id
348: + "] created");
349:
350: return id;
351: } else {
352: sql = "INSERT INTO " + destinationTable
353: + " (name,is_topic) VALUES(?,?)";
354: pstmt = conn.prepareStatement(sql,
355: PreparedStatement.RETURN_GENERATED_KEYS);
356: pstmt.setString(1, name);
357: pstmt.setInt(2, isTopic ? 1 : 0);
358:
359: pstmt.executeUpdate();
360:
361: rs = pstmt.getGeneratedKeys();
362:
363: if (rs.next()) {
364: int id = rs.getInt(1);
365:
366: if (isTopic)
367: log.fine("JMSTopic[" + name + "," + id
368: + "] created");
369: else
370: log.fine("JMSQueue[" + name + "," + id
371: + "] created");
372:
373: return id;
374: } else
375: throw new SQLException(L.l(
376: "can't generate destination for {0}", name));
377: }
378: } finally {
379: conn.close();
380: }
381: }
382:
383: /**
384: * Purges expired messages.
385: */
386: protected void purgeExpiredMessages() {
387: }
388:
389: /**
390: * Returns a printable view of the queue.
391: */
392: public String toString() {
393: return "JdbcQueue[" + getName() + "]";
394: }
395: }
|