001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.pm.jdbc2;
023:
024: import java.io.IOException;
025: import java.sql.Connection;
026: import java.sql.PreparedStatement;
027: import java.sql.ResultSet;
028: import java.sql.SQLException;
029:
030: import javax.jms.JMSException;
031:
032: import org.jboss.mq.SpyMessage;
033: import org.jboss.mq.pm.Tx;
034:
035: /**
036: * OracleThinPersistenceManager.<p>
037: *
038: * Based on information provided by Ron Teeter at TradeBeam Holdings, Inc.
039: *
040: * @author <a href="adrian@jboss.com">Adrian Brock</a>
041: * @version $Revision: 57198 $
042: */
043: public class OracleThinPersistenceManager extends PersistenceManager {
044: /** Insert an empty blob */
045: protected String INSERT_EMPTY_BLOB = "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,EMPTY_BLOB(),?,?)";
046: /** Lock empty blob */
047: protected String LOCK_EMPTY_BLOB = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID = ? AND DESTINATION = ? FOR UPDATE";
048:
049: /**
050: * Create a new OracleThinPersistenceManager.
051: *
052: * @throws JMSException for any error
053: */
054: public OracleThinPersistenceManager() throws JMSException {
055: }
056:
057: protected void add(Connection c, String queue, SpyMessage message,
058: Tx txId, String mark) throws SQLException, IOException {
059: PreparedStatement stmt = null;
060: ResultSet rs = null;
061: try {
062: stmt = c.prepareStatement(INSERT_EMPTY_BLOB);
063:
064: stmt.setLong(1, message.header.messageId);
065: stmt.setString(2, queue);
066:
067: if (txId != null)
068: stmt.setLong(3, txId.longValue());
069: else
070: stmt.setNull(3, java.sql.Types.BIGINT);
071: stmt.setString(4, mark);
072:
073: int count = stmt.executeUpdate();
074: safeClose(stmt, null);
075: if (count != 1)
076: throw new IOException(
077: "Could not insert empty blob in the database: insert affected "
078: + count + " rows. message=" + message);
079:
080: stmt = c.prepareStatement(LOCK_EMPTY_BLOB);
081: stmt.setLong(1, message.header.messageId);
082: stmt.setString(2, queue);
083:
084: rs = stmt.executeQuery();
085: if (rs.next() == false)
086: throw new IOException(
087: "Could not lock empty blob in the database. message="
088: + message);
089: safeClose(stmt, rs);
090:
091: stmt = c.prepareStatement(UPDATE_MESSAGE);
092: setBlob(stmt, 1, message);
093: stmt.setLong(2, message.header.messageId);
094: stmt.setString(3, queue);
095:
096: count = stmt.executeUpdate();
097: safeClose(stmt, null);
098: if (count != 1)
099: throw new IOException(
100: "Could not update real blob in the database: update affected "
101: + count + " rows. message=" + message);
102: } finally {
103: safeClose(stmt, rs);
104: }
105: }
106:
107: public void startService() throws Exception {
108: INSERT_EMPTY_BLOB = sqlProperties.getProperty(
109: "INSERT_EMPTY_BLOB", INSERT_EMPTY_BLOB);
110: LOCK_EMPTY_BLOB = sqlProperties.getProperty("LOCK_EMPTY_BLOB",
111: LOCK_EMPTY_BLOB);
112: super .startService();
113: }
114:
115: protected void safeClose(PreparedStatement stmt, ResultSet rs) {
116: try {
117: if (rs != null) {
118: rs.close();
119: }
120: } catch (SQLException ignored) {
121: log.trace("Ignored", ignored);
122: }
123: try {
124: if (stmt != null) {
125: stmt.close();
126: }
127: } catch (SQLException ignored) {
128: log.trace("Ignored", ignored);
129: }
130: }
131: }
|