0001: /*
0002: * JBoss, Home of Professional Open Source.
0003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004: * as indicated by the @author tags. See the copyright.txt file in the
0005: * distribution for a full listing of individual contributors.
0006: *
0007: * This is free software; you can redistribute it and/or modify it
0008: * under the terms of the GNU Lesser General Public License as
0009: * published by the Free Software Foundation; either version 2.1 of
0010: * the License, or (at your option) any later version.
0011: *
0012: * This software is distributed in the hope that it will be useful,
0013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015: * Lesser General Public License for more details.
0016: *
0017: * You should have received a copy of the GNU Lesser General Public
0018: * License along with this software; if not, write to the Free
0019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021: */
0022: package org.jboss.mq.pm.jdbc2;
0023:
0024: import java.io.ByteArrayInputStream;
0025: import java.io.ByteArrayOutputStream;
0026: import java.io.IOException;
0027: import java.io.ObjectInputStream;
0028: import java.io.ObjectOutputStream;
0029: import java.io.StreamCorruptedException;
0030: import java.sql.Connection;
0031: import java.sql.PreparedStatement;
0032: import java.sql.ResultSet;
0033: import java.sql.SQLException;
0034: import java.util.HashMap;
0035: import java.util.Iterator;
0036: import java.util.Map;
0037: import java.util.Properties;
0038:
0039: import javax.jms.JMSException;
0040: import javax.management.AttributeNotFoundException;
0041: import javax.management.InstanceNotFoundException;
0042: import javax.management.MBeanException;
0043: import javax.management.ObjectName;
0044: import javax.management.ReflectionException;
0045: import javax.naming.InitialContext;
0046: import javax.naming.NamingException;
0047: import javax.sql.DataSource;
0048: import javax.transaction.Status;
0049: import javax.transaction.Transaction;
0050: import javax.transaction.TransactionManager;
0051: import javax.transaction.xa.Xid;
0052:
0053: import org.jboss.mq.SpyDestination;
0054: import org.jboss.mq.SpyJMSException;
0055: import org.jboss.mq.SpyMessage;
0056: import org.jboss.mq.SpyTopic;
0057: import org.jboss.mq.pm.CacheStore;
0058: import org.jboss.mq.pm.Tx;
0059: import org.jboss.mq.pm.TxManager;
0060: import org.jboss.mq.server.JMSDestination;
0061: import org.jboss.mq.server.MessageCache;
0062: import org.jboss.mq.server.MessageReference;
0063: import org.jboss.system.ServiceMBeanSupport;
0064: import org.jboss.tm.TransactionManagerService;
0065: import org.jboss.tm.TransactionTimeoutConfiguration;
0066: import org.jboss.util.UnreachableStatementException;
0067:
0068: import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
0069:
0070: /**
0071: * This class manages all persistence related services for JDBC based
0072: * persistence.
0073: *
0074: * @author Jayesh Parayali (jayeshpk1@yahoo.com)
0075: * @author Hiram Chirino (cojonudo14@hotmail.com)
0076: * @author Adrian Brock (adrian@jboss.com)
0077: * @version $Revision: 61581 $
0078: */
0079: public class PersistenceManager extends ServiceMBeanSupport implements
0080: PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager,
0081: CacheStore {
0082: /** FAQ about concurrency problems */
0083: private static String CONCURRENCY_WARNING = "\nCommon reasons for missing messages are \n1) You have multiple JBossMQs running over the same database.\n2) You are using a replicating database that is not keeping up with replication.";
0084:
0085: /////////////////////////////////////////////////////////////////////////////////
0086: //
0087: // TX state attibutes
0088: //
0089: /////////////////////////////////////////////////////////////////////////////////
0090:
0091: /** The next transaction id */
0092: protected SynchronizedLong nextTransactionId = new SynchronizedLong(
0093: 0l);
0094:
0095: /** The jta transaction manager */
0096: protected TxManager txManager;
0097:
0098: /** The DataSource */
0099: protected DataSource datasource;
0100:
0101: /** The JBossMQ transaction mananger */
0102: protected TransactionManager tm;
0103:
0104: /** The override recovery timeout */
0105: private int recoveryTimeout = 0;
0106:
0107: /** The recovery retries */
0108: private int recoveryRetries = 0;
0109:
0110: /** The recover messages chunk */
0111: private int recoverMessagesChunk = 0;
0112:
0113: /** The statement retries */
0114: private int statementRetries = 5;
0115:
0116: /////////////////////////////////////////////////////////////////////////////////
0117: //
0118: // JDBC Access Attributes
0119: //
0120: /////////////////////////////////////////////////////////////////////////////////
0121:
0122: protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
0123: protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
0124: protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
0125: protected String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
0126: protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
0127: protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
0128: protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
0129: protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
0130: protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
0131: protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
0132: protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
0133: protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
0134: protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
0135: protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
0136: protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
0137: protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
0138: protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
0139: protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
0140: protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
0141: protected String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
0142: protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
0143: protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
0144: protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
0145: protected String CREATE_MESSAGE_TABLE = "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
0146: + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
0147: + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
0148: protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
0149: protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
0150: protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
0151: protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
0152:
0153: protected static final int OBJECT_BLOB = 0;
0154: protected static final int BYTES_BLOB = 1;
0155: protected static final int BINARYSTREAM_BLOB = 2;
0156: protected static final int BLOB_BLOB = 3;
0157:
0158: protected int blobType = OBJECT_BLOB;
0159: protected boolean createTables;
0160:
0161: protected int connectionRetryAttempts = 5;
0162:
0163: protected boolean xaRecovery = false;
0164:
0165: /////////////////////////////////////////////////////////////////////////////////
0166: //
0167: // Constructor.
0168: //
0169: /////////////////////////////////////////////////////////////////////////////////
0170: public PersistenceManager() throws javax.jms.JMSException {
0171: txManager = new TxManager(this );
0172: }
0173:
0174: /**
0175: * This inner class helps handle the tx management of the jdbc connections.
0176: *
0177: */
0178: protected class TransactionManagerStrategy {
0179:
0180: Transaction threadTx;
0181:
0182: void startTX() throws JMSException {
0183: try {
0184: // Thread arriving must be clean (jboss doesn't set the thread
0185: // previously). However optimized calls come with associated
0186: // thread for example. We suspend the thread association here, and
0187: // resume in the finally block of the following try.
0188: threadTx = tm.suspend();
0189:
0190: // Always begin a transaction
0191: tm.begin();
0192: } catch (Exception e) {
0193: try {
0194: if (threadTx != null)
0195: tm.resume(threadTx);
0196: } catch (Exception ignore) {
0197: }
0198: throw new SpyJMSException(
0199: "Could not start a transaction with the transaction manager.",
0200: e);
0201: }
0202: }
0203:
0204: void setRollbackOnly() throws JMSException {
0205: try {
0206: tm.setRollbackOnly();
0207: } catch (Exception e) {
0208: throw new SpyJMSException(
0209: "Could not start a mark the transaction for rollback .",
0210: e);
0211: }
0212: }
0213:
0214: void endTX() throws JMSException {
0215: try {
0216: if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
0217: tm.rollback();
0218: } else {
0219: tm.commit();
0220: }
0221: } catch (Exception e) {
0222: throw new SpyJMSException(
0223: "Could not start a transaction with the transaction manager.",
0224: e);
0225: } finally {
0226: try {
0227: if (threadTx != null)
0228: tm.resume(threadTx);
0229: } catch (Exception ignore) {
0230: }
0231: }
0232: }
0233: }
0234:
0235: /////////////////////////////////////////////////////////////////////////////////
0236: //
0237: // TX Resolution.
0238: //
0239: /////////////////////////////////////////////////////////////////////////////////
0240:
0241: synchronized protected void createSchema() throws JMSException {
0242: TransactionManagerStrategy tms = new TransactionManagerStrategy();
0243: tms.startTX();
0244: Connection c = null;
0245: PreparedStatement stmt = null;
0246: boolean threadWasInterrupted = Thread.interrupted();
0247: try {
0248: if (createTables) {
0249: c = this .getConnection();
0250:
0251: boolean createdMessageTable = false;
0252: try {
0253: stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
0254: stmt.executeUpdate();
0255: createdMessageTable = true;
0256: } catch (SQLException e) {
0257: log.debug("Could not create table with SQL: "
0258: + CREATE_MESSAGE_TABLE, e);
0259: } finally {
0260: try {
0261: if (stmt != null)
0262: stmt.close();
0263: } catch (Throwable ignored) {
0264: log.trace("Ignored: " + ignored);
0265: }
0266: stmt = null;
0267: }
0268:
0269: if (createdMessageTable) {
0270: try {
0271: stmt = c
0272: .prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
0273: stmt.executeUpdate();
0274: } catch (SQLException e) {
0275: log.debug("Could not create index with SQL: "
0276: + CREATE_IDX_MESSAGE_TXOP_TXID, e);
0277: } finally {
0278: try {
0279: if (stmt != null)
0280: stmt.close();
0281: } catch (Throwable ignored) {
0282: log.trace("Ignored: " + ignored);
0283: }
0284: stmt = null;
0285: }
0286: try {
0287: stmt = c
0288: .prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
0289: stmt.executeUpdate();
0290: } catch (SQLException e) {
0291: log.debug("Could not create index with SQL: "
0292: + CREATE_IDX_MESSAGE_DESTINATION, e);
0293: } finally {
0294: try {
0295: if (stmt != null)
0296: stmt.close();
0297: } catch (Throwable ignored) {
0298: log.trace("Ignored: " + ignored);
0299: }
0300: stmt = null;
0301: }
0302: }
0303:
0304: String createTxTable = CREATE_TX_TABLE;
0305: if (xaRecovery)
0306: createTxTable = CREATE_TX_TABLE_XARECOVERY;
0307: try {
0308: stmt = c.prepareStatement(createTxTable);
0309: stmt.executeUpdate();
0310: } catch (SQLException e) {
0311: log.debug("Could not create table with SQL: "
0312: + createTxTable, e);
0313: } finally {
0314: try {
0315: if (stmt != null)
0316: stmt.close();
0317: } catch (Throwable ignored) {
0318: log.trace("Ignored: " + ignored);
0319: }
0320: stmt = null;
0321: }
0322: }
0323: } catch (SQLException e) {
0324: tms.setRollbackOnly();
0325: throw new SpyJMSException(
0326: "Could not get a connection for jdbc2 table construction ",
0327: e);
0328: } finally {
0329: try {
0330: if (stmt != null)
0331: stmt.close();
0332: } catch (Throwable ignore) {
0333: }
0334: stmt = null;
0335: try {
0336: if (c != null)
0337: c.close();
0338: } catch (Throwable ignore) {
0339: }
0340: c = null;
0341: tms.endTX();
0342:
0343: // Restore the interrupted state of the thread
0344: if (threadWasInterrupted)
0345: Thread.currentThread().interrupt();
0346: }
0347: }
0348:
0349: synchronized protected void resolveAllUncommitedTXs()
0350: throws JMSException {
0351: // We perform recovery in a different thread to the table creation
0352: // Postgres doesn't like create table failing in the same transaction
0353: // as other operations
0354:
0355: TransactionManagerStrategy tms = new TransactionManagerStrategy();
0356: tms.startTX();
0357: Connection c = null;
0358: PreparedStatement stmt = null;
0359: ResultSet rs = null;
0360: boolean threadWasInterrupted = Thread.interrupted();
0361: try {
0362: c = this .getConnection();
0363:
0364: // Find out what the next TXID should be
0365: stmt = c.prepareStatement(SELECT_MAX_TX);
0366: rs = stmt.executeQuery();
0367: if (rs.next())
0368: nextTransactionId.set(rs.getLong(1) + 1);
0369: rs.close();
0370: rs = null;
0371: stmt.close();
0372: stmt = null;
0373:
0374: // Delete all the temporary messages.
0375: stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
0376: stmt.executeUpdate();
0377: stmt.close();
0378: stmt = null;
0379:
0380: // Delete all the messages that were added but thier tx's were not commited.
0381: String deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX;
0382: if (xaRecovery)
0383: deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
0384: stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
0385: stmt.setString(1, "A");
0386: stmt.executeUpdate();
0387: stmt.close();
0388: stmt = null;
0389:
0390: // Restore all the messages that were removed but their tx's were not commited.
0391: String updateMarkedMessages = UPDATE_MARKED_MESSAGES;
0392: if (xaRecovery)
0393: updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY;
0394: stmt = c.prepareStatement(updateMarkedMessages);
0395: stmt.setNull(1, java.sql.Types.BIGINT);
0396: stmt.setString(2, "A");
0397: stmt.setString(3, "D");
0398: stmt.executeUpdate();
0399: stmt.close();
0400: stmt = null;
0401:
0402: // Now recovery is complete, clear the transaction table.
0403: String deleteAllTx = DELETE_ALL_TX;
0404: if (xaRecovery)
0405: deleteAllTx = DELETE_ALL_TX_XARECOVERY;
0406: stmt = c.prepareStatement(deleteAllTx);
0407: stmt.execute();
0408: stmt.close();
0409: stmt = null;
0410:
0411: // If we are doing XARecovery restore the prepared transactions
0412: if (xaRecovery) {
0413: stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY);
0414: rs = stmt.executeQuery();
0415: while (rs.next()) {
0416: long txid = rs.getLong(1);
0417: Xid xid = extractXid(rs, 2);
0418: Tx tx = new Tx(txid);
0419: tx.setXid(xid);
0420: tx.checkPersisted();
0421: txManager.restoreTx(tx);
0422: }
0423: rs.close();
0424: rs = null;
0425: stmt.close();
0426: stmt = null;
0427: }
0428: } catch (Exception e) {
0429: tms.setRollbackOnly();
0430: throw new SpyJMSException(
0431: "Could not resolve uncommited transactions. Message recovery may not be accurate",
0432: e);
0433: } finally {
0434: try {
0435: if (rs != null)
0436: rs.close();
0437: } catch (Throwable ignore) {
0438: }
0439: try {
0440: if (stmt != null)
0441: stmt.close();
0442: } catch (Throwable ignore) {
0443: }
0444: try {
0445: if (c != null)
0446: c.close();
0447: } catch (Throwable ignore) {
0448: }
0449: tms.endTX();
0450:
0451: // Restore the interrupted state of the thread
0452: if (threadWasInterrupted)
0453: Thread.currentThread().interrupt();
0454: }
0455: }
0456:
0457: /////////////////////////////////////////////////////////////////////////////////
0458: //
0459: // Message Recovery
0460: //
0461: /////////////////////////////////////////////////////////////////////////////////
0462:
0463: synchronized public void restoreQueue(JMSDestination jmsDest,
0464: SpyDestination dest) throws JMSException {
0465: if (jmsDest == null)
0466: throw new IllegalArgumentException(
0467: "Must supply non null JMSDestination to restoreQueue");
0468: if (dest == null)
0469: throw new IllegalArgumentException(
0470: "Must supply non null SpyDestination to restoreQueue");
0471:
0472: boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration);
0473: int previousTimeout = 0;
0474: try {
0475: // Set our timeout
0476: if (recoveryTimeout != 0) {
0477: if (canOverrideTimeout) {
0478: previousTimeout = ((TransactionTimeoutConfiguration) tm)
0479: .getTransactionTimeout();
0480: tm.setTransactionTimeout(recoveryTimeout);
0481: } else {
0482: log
0483: .debug("Cannot override recovery timeout, TransactionManager does implement "
0484: + TransactionTimeoutConfiguration.class
0485: .getName());
0486: }
0487: }
0488:
0489: // restore the queue
0490: try {
0491: internalRestoreQueue(jmsDest, dest);
0492: } finally {
0493: // restore the transaction timeout
0494: if (recoveryTimeout != 0 && canOverrideTimeout)
0495: tm.setTransactionTimeout(previousTimeout);
0496: }
0497: } catch (Exception e) {
0498: SpyJMSException.rethrowAsJMSException(
0499: "Unexpected error in recovery", e);
0500: }
0501: }
0502:
0503: synchronized protected void internalRestoreQueue(
0504: JMSDestination jmsDest, SpyDestination dest)
0505: throws JMSException {
0506: // Work out the prepared transactions
0507: Map prepared = null;
0508: if (xaRecovery) {
0509: prepared = new HashMap();
0510: Map map = txManager.getPreparedTransactions();
0511: for (Iterator i = map.values().iterator(); i.hasNext();) {
0512: TxManager.PreparedInfo info = (TxManager.PreparedInfo) i
0513: .next();
0514: for (Iterator j = info.getTxids().iterator(); j
0515: .hasNext();) {
0516: Tx tx = (Tx) j.next();
0517: prepared.put(new Long(tx.longValue()), tx);
0518: }
0519: }
0520: }
0521:
0522: TransactionManagerStrategy tms = new TransactionManagerStrategy();
0523: tms.startTX();
0524: Connection c = null;
0525: PreparedStatement stmt = null;
0526: PreparedStatement stmt2 = null;
0527: ResultSet rs = null;
0528: boolean threadWasInterrupted = Thread.interrupted();
0529: try {
0530: String selectMessagesInDest = SELECT_MESSAGES_IN_DEST;
0531: String selectMessage = SELECT_MESSAGE;
0532: if (xaRecovery) {
0533: selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY;
0534: selectMessage = SELECT_MESSAGE_XARECOVERY;
0535: }
0536: c = this .getConnection();
0537: if (recoverMessagesChunk == 0)
0538: stmt = c.prepareStatement(selectMessagesInDest);
0539: else {
0540: stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST);
0541: stmt2 = c.prepareStatement(selectMessage);
0542: }
0543: stmt.setString(1, dest.toString());
0544:
0545: long txid = 0;
0546: String txop = null;
0547: rs = stmt.executeQuery();
0548: int counter = 0;
0549: int recovery = 0;
0550: while (rs.next()) {
0551: long msgid = rs.getLong(1);
0552: SpyMessage message = null;
0553: if (recoverMessagesChunk == 0) {
0554: message = extractMessage(rs);
0555: if (xaRecovery) {
0556: txid = rs.getLong(3);
0557: txop = rs.getString(4);
0558: }
0559: } else {
0560: ResultSet rs2 = null;
0561: try {
0562: stmt2.setLong(1, msgid);
0563: stmt2.setString(2, dest.toString());
0564: rs2 = stmt2.executeQuery();
0565: if (rs2.next()) {
0566: message = extractMessage(rs2);
0567: if (xaRecovery) {
0568: txid = rs.getLong(3);
0569: txop = rs.getString(4);
0570: }
0571: } else
0572: log.warn("Failed to find message msgid="
0573: + msgid + " dest=" + dest);
0574: } finally {
0575: if (rs2 != null) {
0576: try {
0577: rs2.close();
0578: } catch (Exception ignored) {
0579: }
0580: }
0581: }
0582: }
0583: // The durable subscription is not serialized
0584: if (dest instanceof SpyTopic)
0585: message.header.durableSubscriberID = ((SpyTopic) dest)
0586: .getDurableSubscriptionID();
0587:
0588: if (xaRecovery == false || txid == 0 || txop == null)
0589: jmsDest.restoreMessage(message);
0590: else {
0591: Tx tx = (Tx) prepared.get(new Long(txid));
0592: if (tx == null)
0593: jmsDest.restoreMessage(message);
0594: else if ("A".equals(txop)) {
0595: jmsDest.restoreMessage(message, tx, Tx.ADD);
0596: recovery++;
0597: } else if ("D".equals(txop)) {
0598: jmsDest.restoreMessage(message, tx, Tx.REMOVE);
0599: recovery++;
0600: } else
0601: throw new IllegalStateException("Unknown txop="
0602: + txop + " for msg=" + msgid + " dest="
0603: + dest);
0604: }
0605: counter++;
0606: }
0607:
0608: log.debug("Restored " + counter + " message(s) to: " + dest
0609: + " " + recovery + " need recovery.");
0610: } catch (IOException e) {
0611: tms.setRollbackOnly();
0612: throw new SpyJMSException(
0613: "Could not restore messages to destination : "
0614: + dest.toString(), e);
0615: } catch (SQLException e) {
0616: tms.setRollbackOnly();
0617: throw new SpyJMSException(
0618: "Could not restore messages to destination : "
0619: + dest.toString(), e);
0620: } finally {
0621: try {
0622: if (rs != null)
0623: rs.close();
0624: } catch (Throwable ignore) {
0625: }
0626: try {
0627: if (stmt != null)
0628: stmt.close();
0629: } catch (Throwable ignore) {
0630: }
0631: try {
0632: if (c != null)
0633: c.close();
0634: } catch (Throwable ignore) {
0635: }
0636: tms.endTX();
0637:
0638: // Restore the interrupted state of the thread
0639: if (threadWasInterrupted)
0640: Thread.currentThread().interrupt();
0641: }
0642:
0643: }
0644:
0645: SpyMessage extractMessage(ResultSet rs) throws SQLException,
0646: IOException {
0647: try {
0648: long messageid = rs.getLong(1);
0649:
0650: SpyMessage message = null;
0651:
0652: if (blobType == OBJECT_BLOB) {
0653:
0654: message = (SpyMessage) rs.getObject(2);
0655:
0656: } else if (blobType == BYTES_BLOB) {
0657:
0658: byte[] st = rs.getBytes(2);
0659: ByteArrayInputStream baip = new ByteArrayInputStream(st);
0660: ObjectInputStream ois = new ObjectInputStream(baip);
0661: message = SpyMessage.readMessage(ois);
0662:
0663: } else if (blobType == BINARYSTREAM_BLOB) {
0664:
0665: ObjectInputStream ois = new ObjectInputStream(rs
0666: .getBinaryStream(2));
0667: message = SpyMessage.readMessage(ois);
0668:
0669: } else if (blobType == BLOB_BLOB) {
0670:
0671: ObjectInputStream ois = new ObjectInputStream(rs
0672: .getBlob(2).getBinaryStream());
0673: message = SpyMessage.readMessage(ois);
0674: }
0675:
0676: message.header.messageId = messageid;
0677: return message;
0678: } catch (StreamCorruptedException e) {
0679: throw new IOException("Could not load the message: " + e);
0680: }
0681: }
0682:
0683: Xid extractXid(ResultSet rs, int column) throws SQLException,
0684: IOException, ClassNotFoundException {
0685: try {
0686: Xid xid = null;
0687:
0688: if (blobType == OBJECT_BLOB) {
0689: xid = (Xid) rs.getObject(column);
0690: } else if (blobType == BYTES_BLOB) {
0691: byte[] st = rs.getBytes(column);
0692: ByteArrayInputStream baip = new ByteArrayInputStream(st);
0693: ObjectInputStream ois = new ObjectInputStream(baip);
0694: xid = (Xid) ois.readObject();
0695: } else if (blobType == BINARYSTREAM_BLOB) {
0696: ObjectInputStream ois = new ObjectInputStream(rs
0697: .getBinaryStream(column));
0698: xid = (Xid) ois.readObject();
0699: } else if (blobType == BLOB_BLOB) {
0700: ObjectInputStream ois = new ObjectInputStream(rs
0701: .getBlob(column).getBinaryStream());
0702: xid = (Xid) ois.readObject();
0703: }
0704:
0705: return xid;
0706: } catch (StreamCorruptedException e) {
0707: throw new IOException("Could not load the message: " + e);
0708: }
0709: }
0710:
0711: /////////////////////////////////////////////////////////////////////////////////
0712: //
0713: // TX Commit
0714: //
0715: /////////////////////////////////////////////////////////////////////////////////
0716: public void commitPersistentTx(Tx txId)
0717: throws javax.jms.JMSException {
0718: if (txId.wasPersisted() == false)
0719: return;
0720:
0721: TransactionManagerStrategy tms = new TransactionManagerStrategy();
0722: tms.startTX();
0723: Connection c = null;
0724: boolean threadWasInterrupted = Thread.interrupted();
0725: try {
0726:
0727: c = this .getConnection();
0728: removeMarkedMessages(c, txId, "D");
0729: removeTXRecord(c, txId.longValue());
0730:
0731: } catch (SQLException e) {
0732: tms.setRollbackOnly();
0733: throw new SpyJMSException("Could not commit tx: " + txId, e);
0734: } finally {
0735: try {
0736: if (c != null)
0737: c.close();
0738: } catch (Throwable ignore) {
0739: }
0740: tms.endTX();
0741:
0742: // Restore the interrupted state of the thread
0743: if (threadWasInterrupted)
0744: Thread.currentThread().interrupt();
0745: }
0746: }
0747:
0748: public void removeMarkedMessages(Connection c, Tx txid, String mark)
0749: throws SQLException {
0750: PreparedStatement stmt = null;
0751: try {
0752: stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
0753: stmt.setLong(1, txid.longValue());
0754: stmt.setString(2, mark);
0755: stmt.executeUpdate();
0756: } finally {
0757: try {
0758: if (stmt != null)
0759: stmt.close();
0760: } catch (Throwable e) {
0761: }
0762: }
0763: }
0764:
0765: public void addTXRecord(Connection c, Tx txid) throws SQLException,
0766: IOException {
0767: PreparedStatement stmt = null;
0768: try {
0769: String insertTx = INSERT_TX;
0770: if (xaRecovery)
0771: insertTx = INSERT_TX_XARECOVERY;
0772: stmt = c.prepareStatement(insertTx);
0773: stmt.setLong(1, txid.longValue());
0774: if (xaRecovery) {
0775: Xid xid = txid.getXid();
0776: if (xid != null)
0777: setBlob(stmt, 2, xid);
0778: else
0779: stmt.setNull(2, java.sql.Types.BLOB);
0780: }
0781: stmt.executeUpdate();
0782: } finally {
0783: try {
0784: if (stmt != null)
0785: stmt.close();
0786: } catch (Throwable e) {
0787: }
0788: }
0789: }
0790:
0791: public void removeTXRecord(Connection c, long txid)
0792: throws SQLException {
0793: PreparedStatement stmt = null;
0794: try {
0795: stmt = c.prepareStatement(DELETE_TX);
0796: stmt.setLong(1, txid);
0797: stmt.executeUpdate();
0798: } finally {
0799: try {
0800: if (stmt != null)
0801: stmt.close();
0802: } catch (Throwable e) {
0803: }
0804: }
0805: }
0806:
0807: /////////////////////////////////////////////////////////////////////////////////
0808: //
0809: // TX Rollback
0810: //
0811: /////////////////////////////////////////////////////////////////////////////////
0812: public void rollbackPersistentTx(Tx txId) throws JMSException {
0813: if (txId.wasPersisted() == false)
0814: return;
0815:
0816: TransactionManagerStrategy tms = new TransactionManagerStrategy();
0817: tms.startTX();
0818: Connection c = null;
0819: PreparedStatement stmt = null;
0820: boolean threadWasInterrupted = Thread.interrupted();
0821: try {
0822:
0823: c = this .getConnection();
0824: removeMarkedMessages(c, txId, "A");
0825: removeTXRecord(c, txId.longValue());
0826:
0827: // Restore all the messages that were logically removed.
0828: stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
0829: stmt.setNull(1, java.sql.Types.BIGINT);
0830: stmt.setString(2, "A");
0831: stmt.setString(3, "D");
0832: stmt.setLong(4, txId.longValue());
0833: stmt.executeUpdate();
0834: stmt.close();
0835: stmt = null;
0836: } catch (SQLException e) {
0837: tms.setRollbackOnly();
0838: throw new SpyJMSException("Could not rollback tx: " + txId,
0839: e);
0840: } finally {
0841: try {
0842: if (stmt != null)
0843: stmt.close();
0844: } catch (Throwable ignore) {
0845: }
0846: try {
0847: if (c != null)
0848: c.close();
0849: } catch (Throwable ignore) {
0850: }
0851: tms.endTX();
0852:
0853: // Restore the interrupted state of the thread
0854: if (threadWasInterrupted)
0855: Thread.currentThread().interrupt();
0856: }
0857:
0858: }
0859:
0860: /////////////////////////////////////////////////////////////////////////////////
0861: //
0862: // TX Creation
0863: //
0864: /////////////////////////////////////////////////////////////////////////////////
0865: public Tx createPersistentTx() throws JMSException {
0866: Tx id = new Tx(nextTransactionId.increment());
0867: return id;
0868: }
0869:
0870: public void insertPersistentTx(TransactionManagerStrategy tms,
0871: Connection c, Tx tx) throws JMSException {
0872: try {
0873: if (tx != null && tx.checkPersisted() == false)
0874: addTXRecord(c, tx);
0875: } catch (Exception e) {
0876: tms.setRollbackOnly();
0877: throw new SpyJMSException("Could not create tx: "
0878: + tx.longValue(), e);
0879: }
0880: }
0881:
0882: /////////////////////////////////////////////////////////////////////////////////
0883: //
0884: // Adding a message
0885: //
0886: /////////////////////////////////////////////////////////////////////////////////
0887: public void add(MessageReference messageRef, Tx txId)
0888: throws javax.jms.JMSException {
0889: boolean trace = log.isTraceEnabled();
0890: if (trace)
0891: log.trace("About to add message " + messageRef
0892: + " transaction=" + txId);
0893:
0894: TransactionManagerStrategy tms = new TransactionManagerStrategy();
0895: tms.startTX();
0896: Connection c = null;
0897: boolean threadWasInterrupted = Thread.interrupted();
0898: try {
0899: c = this .getConnection();
0900:
0901: // Lazily write the peristent transaction
0902: insertPersistentTx(tms, c, txId);
0903:
0904: // Synchronize on the message to avoid a race with the softener
0905: synchronized (messageRef) {
0906: SpyMessage message = messageRef.getMessage();
0907:
0908: // has it allready been stored by the message cache interface??
0909: if (messageRef.stored == MessageReference.STORED) {
0910: if (trace)
0911: log.trace("Updating message " + messageRef
0912: + " transaction=" + txId);
0913:
0914: markMessage(c, messageRef.messageId, messageRef
0915: .getPersistentKey(), txId, "A");
0916: } else {
0917: if (trace)
0918: log.trace("Inserting message " + messageRef
0919: + " transaction=" + txId);
0920:
0921: add(c, messageRef.getPersistentKey(), message,
0922: txId, "A");
0923: messageRef.setStored(MessageReference.STORED);
0924: }
0925: if (trace)
0926: log.trace("Added message " + messageRef
0927: + " transaction=" + txId);
0928: }
0929: } catch (IOException e) {
0930: tms.setRollbackOnly();
0931: throw new SpyJMSException("Could not store message: "
0932: + messageRef, e);
0933: } catch (SQLException e) {
0934: tms.setRollbackOnly();
0935: throw new SpyJMSException("Could not store message: "
0936: + messageRef, e);
0937: } finally {
0938: try {
0939: if (c != null)
0940: c.close();
0941: } catch (Throwable ignore) {
0942: }
0943: tms.endTX();
0944:
0945: // Restore the interrupted state of the thread
0946: if (threadWasInterrupted)
0947: Thread.currentThread().interrupt();
0948: }
0949: }
0950:
0951: protected void add(Connection c, String queue, SpyMessage message,
0952: Tx txId, String mark) throws SQLException, IOException {
0953: PreparedStatement stmt = null;
0954: try {
0955:
0956: stmt = c.prepareStatement(INSERT_MESSAGE);
0957:
0958: stmt.setLong(1, message.header.messageId);
0959: stmt.setString(2, queue);
0960: setBlob(stmt, 3, message);
0961:
0962: if (txId != null)
0963: stmt.setLong(4, txId.longValue());
0964: else
0965: stmt.setNull(4, java.sql.Types.BIGINT);
0966: stmt.setString(5, mark);
0967:
0968: stmt.executeUpdate();
0969: } finally {
0970: try {
0971: if (stmt != null)
0972: stmt.close();
0973: } catch (Throwable ignore) {
0974: }
0975: }
0976: }
0977:
0978: public void markMessage(Connection c, long messageid,
0979: String destination, Tx txId, String mark)
0980: throws SQLException {
0981: PreparedStatement stmt = null;
0982: try {
0983:
0984: stmt = c.prepareStatement(MARK_MESSAGE);
0985: if (txId == null) {
0986: stmt.setNull(1, java.sql.Types.BIGINT);
0987: } else {
0988: stmt.setLong(1, txId.longValue());
0989: }
0990: stmt.setString(2, mark);
0991: stmt.setLong(3, messageid);
0992: stmt.setString(4, destination);
0993: stmt.executeUpdate();
0994: } finally {
0995: try {
0996: if (stmt != null)
0997: stmt.close();
0998: } catch (Throwable ignore) {
0999: }
1000: }
1001:
1002: }
1003:
1004: public void setBlob(PreparedStatement stmt, int column,
1005: SpyMessage message) throws IOException, SQLException {
1006: if (blobType == OBJECT_BLOB) {
1007: stmt.setObject(column, message);
1008: } else if (blobType == BYTES_BLOB) {
1009: ByteArrayOutputStream baos = new ByteArrayOutputStream();
1010: ObjectOutputStream oos = new ObjectOutputStream(baos);
1011: SpyMessage.writeMessage(message, oos);
1012: oos.flush();
1013: byte[] messageAsBytes = baos.toByteArray();
1014: stmt.setBytes(column, messageAsBytes);
1015: } else if (blobType == BINARYSTREAM_BLOB) {
1016: ByteArrayOutputStream baos = new ByteArrayOutputStream();
1017: ObjectOutputStream oos = new ObjectOutputStream(baos);
1018: SpyMessage.writeMessage(message, oos);
1019: oos.flush();
1020: byte[] messageAsBytes = baos.toByteArray();
1021: ByteArrayInputStream bais = new ByteArrayInputStream(
1022: messageAsBytes);
1023: stmt.setBinaryStream(column, bais, messageAsBytes.length);
1024: } else if (blobType == BLOB_BLOB) {
1025:
1026: throw new RuntimeException(
1027: "BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1028: /** TODO:
1029: ByteArrayOutputStream baos= new ByteArrayOutputStream();
1030: ObjectOutputStream oos= new ObjectOutputStream(baos);
1031: oos.writeObject(message);
1032: byte[] messageAsBytes= baos.toByteArray();
1033: ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1034: stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1035: */
1036: }
1037: }
1038:
1039: public void setBlob(PreparedStatement stmt, int column, Xid xid)
1040: throws IOException, SQLException {
1041: if (blobType == OBJECT_BLOB) {
1042: stmt.setObject(column, xid);
1043: } else if (blobType == BYTES_BLOB) {
1044: ByteArrayOutputStream baos = new ByteArrayOutputStream();
1045: ObjectOutputStream oos = new ObjectOutputStream(baos);
1046: oos.writeObject(xid);
1047: oos.flush();
1048: byte[] messageAsBytes = baos.toByteArray();
1049: stmt.setBytes(column, messageAsBytes);
1050: } else if (blobType == BINARYSTREAM_BLOB) {
1051: ByteArrayOutputStream baos = new ByteArrayOutputStream();
1052: ObjectOutputStream oos = new ObjectOutputStream(baos);
1053: oos.writeObject(xid);
1054: oos.flush();
1055: byte[] messageAsBytes = baos.toByteArray();
1056: ByteArrayInputStream bais = new ByteArrayInputStream(
1057: messageAsBytes);
1058: stmt.setBinaryStream(column, bais, messageAsBytes.length);
1059: } else if (blobType == BLOB_BLOB) {
1060:
1061: throw new RuntimeException(
1062: "BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1063: /** TODO:
1064: ByteArrayOutputStream baos= new ByteArrayOutputStream();
1065: ObjectOutputStream oos= new ObjectOutputStream(baos);
1066: oos.writeObject(xid);
1067: byte[] messageAsBytes= baos.toByteArray();
1068: ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1069: stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1070: */
1071: }
1072: }
1073:
1074: /////////////////////////////////////////////////////////////////////////////////
1075: //
1076: // Updating a message
1077: //
1078: /////////////////////////////////////////////////////////////////////////////////
1079: public void update(MessageReference messageRef, Tx txId)
1080: throws javax.jms.JMSException {
1081: boolean trace = log.isTraceEnabled();
1082: if (trace)
1083: log.trace("Updating message " + messageRef
1084: + " transaction=" + txId);
1085:
1086: TransactionManagerStrategy tms = new TransactionManagerStrategy();
1087: tms.startTX();
1088: Connection c = null;
1089: PreparedStatement stmt = null;
1090: boolean threadWasInterrupted = Thread.interrupted();
1091: try {
1092:
1093: c = this .getConnection();
1094: if (txId == null) {
1095:
1096: stmt = c.prepareStatement(UPDATE_MESSAGE);
1097: setBlob(stmt, 1, messageRef.getMessage());
1098: stmt.setLong(2, messageRef.messageId);
1099: stmt.setString(3, messageRef.getPersistentKey());
1100: int rc = stmt.executeUpdate();
1101: if (rc != 1)
1102: throw new SpyJMSException(
1103: "Could not update the message in the database: update affected "
1104: + rc + " rows");
1105: } else {
1106: throw new SpyJMSException(
1107: "NYI: Updating a message in a transaction is not currently used");
1108: }
1109: if (trace)
1110: log.trace("Updated message " + messageRef
1111: + " transaction=" + txId);
1112:
1113: } catch (IOException e) {
1114: tms.setRollbackOnly();
1115: throw new SpyJMSException("Could not update message: "
1116: + messageRef, e);
1117: } catch (SQLException e) {
1118: tms.setRollbackOnly();
1119: throw new SpyJMSException("Could not update message: "
1120: + messageRef, e);
1121: } finally {
1122: try {
1123: if (stmt != null)
1124: stmt.close();
1125: } catch (Throwable ignore) {
1126: }
1127: try {
1128: if (c != null)
1129: c.close();
1130: } catch (Throwable ignore) {
1131: }
1132: tms.endTX();
1133:
1134: // Restore the interrupted state of the thread
1135: if (threadWasInterrupted)
1136: Thread.currentThread().interrupt();
1137: }
1138:
1139: }
1140:
1141: /////////////////////////////////////////////////////////////////////////////////
1142: //
1143: // Removing a message
1144: //
1145: /////////////////////////////////////////////////////////////////////////////////
1146: public void remove(MessageReference messageRef, Tx txId)
1147: throws javax.jms.JMSException {
1148: boolean trace = log.isTraceEnabled();
1149: if (trace)
1150: log.trace("Removing message " + messageRef
1151: + " transaction=" + txId);
1152:
1153: TransactionManagerStrategy tms = new TransactionManagerStrategy();
1154: tms.startTX();
1155: Connection c = null;
1156: PreparedStatement stmt = null;
1157: boolean threadWasInterrupted = Thread.interrupted();
1158: try {
1159: c = this .getConnection();
1160:
1161: // Lazily write the peristent transaction
1162: insertPersistentTx(tms, c, txId);
1163:
1164: // Synchronize on the message to avoid a race with the softener
1165: synchronized (messageRef) {
1166: if (txId == null) {
1167: stmt = c.prepareStatement(DELETE_MESSAGE);
1168: stmt.setLong(1, messageRef.messageId);
1169: stmt.setString(2, messageRef.getPersistentKey());
1170:
1171: // Adrian Brock:
1172: // Remove the message from the cache, but don't
1173: // return it to the pool just yet. The queue still holds
1174: // a reference to the message and will return it
1175: // to the pool once it gets enough time slice.
1176: // The alternative is to remove the validation
1177: // for double removal from the cache,
1178: // which I don't want to do because it is useful
1179: // for spotting errors
1180: messageRef.setStored(MessageReference.NOT_STORED);
1181: messageRef.removeDelayed();
1182: } else {
1183: stmt = c.prepareStatement(MARK_MESSAGE);
1184: stmt.setLong(1, txId.longValue());
1185: stmt.setString(2, "D");
1186: stmt.setLong(3, messageRef.messageId);
1187: stmt.setString(4, messageRef.getPersistentKey());
1188: }
1189:
1190: int tries = 0;
1191: while (true) {
1192: try {
1193: int rc = stmt.executeUpdate();
1194:
1195: if (tries > 0) {
1196: if (rc != 1)
1197: throw new SpyJMSException(
1198: "Could not mark the message as deleted in the database: update affected "
1199: + rc + " rows."
1200: + CONCURRENCY_WARNING);
1201:
1202: log.warn("Remove operation worked after "
1203: + tries + " retries");
1204: }
1205: break;
1206: } catch (SQLException e) {
1207: log.warn(
1208: "SQLException caught - assuming deadlock detected, try:"
1209: + (tries + 1), e);
1210: tries++;
1211: if (tries >= statementRetries) {
1212: log.error("Retried " + tries
1213: + " times, now giving up");
1214: throw new IllegalStateException(
1215: "Could not remove message after "
1216: + tries + "attempts");
1217: }
1218: log.warn("Trying again after a pause");
1219: //Now we wait for a random amount of time to minimise risk of deadlock
1220: Thread.sleep((long) (Math.random() * 500));
1221: }
1222: }
1223:
1224: if (trace)
1225: log.trace("Removed message " + messageRef
1226: + " transaction=" + txId);
1227: }
1228: } catch (Exception e) {
1229: tms.setRollbackOnly();
1230: throw new SpyJMSException("Could not remove message: "
1231: + messageRef, e);
1232: } finally {
1233: try {
1234: if (stmt != null)
1235: stmt.close();
1236: } catch (Throwable ignore) {
1237: }
1238: try {
1239: if (c != null)
1240: c.close();
1241: } catch (Throwable ignore) {
1242: }
1243: tms.endTX();
1244:
1245: // Restore the interrupted state of the thread
1246: if (threadWasInterrupted)
1247: Thread.currentThread().interrupt();
1248: }
1249:
1250: }
1251:
1252: /////////////////////////////////////////////////////////////////////////////////
1253: //
1254: // Misc. PM functions
1255: //
1256: /////////////////////////////////////////////////////////////////////////////////
1257:
1258: public TxManager getTxManager() {
1259: return txManager;
1260: }
1261:
1262: public void closeQueue(JMSDestination jmsDest, SpyDestination dest)
1263: throws JMSException {
1264: // Nothing to clean up, all the state is in the db.
1265: }
1266:
1267: public SpyMessage loadFromStorage(MessageReference messageRef)
1268: throws JMSException {
1269: if (log.isTraceEnabled())
1270: log.trace("Loading message from storage " + messageRef);
1271:
1272: TransactionManagerStrategy tms = new TransactionManagerStrategy();
1273: tms.startTX();
1274: Connection c = null;
1275: PreparedStatement stmt = null;
1276: ResultSet rs = null;
1277: boolean threadWasInterrupted = Thread.interrupted();
1278: try {
1279:
1280: c = this .getConnection();
1281: stmt = c.prepareStatement(SELECT_MESSAGE);
1282: stmt.setLong(1, messageRef.messageId);
1283: stmt.setString(2, messageRef.getPersistentKey());
1284:
1285: rs = stmt.executeQuery();
1286: if (rs.next())
1287: return extractMessage(rs);
1288: else
1289: throw new SpyJMSException(
1290: "Could not load message from storage: "
1291: + messageRef + " "
1292: + CONCURRENCY_WARNING);
1293:
1294: } catch (Exception e) {
1295: tms.setRollbackOnly();
1296: SpyJMSException.rethrowAsJMSException(
1297: "Could not load message : " + messageRef, e);
1298: throw new UnreachableStatementException();
1299: } finally {
1300: try {
1301: if (rs != null)
1302: rs.close();
1303: } catch (Throwable ignore) {
1304: }
1305: try {
1306: if (stmt != null)
1307: stmt.close();
1308: } catch (Throwable ignore) {
1309: }
1310: try {
1311: if (c != null)
1312: c.close();
1313: } catch (Throwable ignore) {
1314: }
1315: tms.endTX();
1316:
1317: // Restore the interrupted state of the thread
1318: if (threadWasInterrupted)
1319: Thread.currentThread().interrupt();
1320: }
1321: }
1322:
1323: /////////////////////////////////////////////////////////////////////////////////
1324: //
1325: // CacheStore Functions
1326: //
1327: /////////////////////////////////////////////////////////////////////////////////
1328: public void removeFromStorage(MessageReference messageRef)
1329: throws JMSException {
1330: // We don't remove persistent messages sent to persistent queues
1331: if (messageRef.isPersistent())
1332: return;
1333:
1334: boolean trace = log.isTraceEnabled();
1335: if (trace)
1336: log.trace("Removing message from storage " + messageRef);
1337:
1338: TransactionManagerStrategy tms = new TransactionManagerStrategy();
1339: tms.startTX();
1340: Connection c = null;
1341: PreparedStatement stmt = null;
1342: boolean threadWasInterrupted = Thread.interrupted();
1343: try {
1344: c = this .getConnection();
1345: stmt = c.prepareStatement(DELETE_MESSAGE);
1346: stmt.setLong(1, messageRef.messageId);
1347: stmt.setString(2, messageRef.getPersistentKey());
1348: stmt.executeUpdate();
1349: messageRef.setStored(MessageReference.NOT_STORED);
1350:
1351: if (trace)
1352: log.trace("Removed message from storage " + messageRef);
1353: } catch (SQLException e) {
1354: tms.setRollbackOnly();
1355: throw new SpyJMSException("Could not remove message: "
1356: + messageRef, e);
1357: } finally {
1358: try {
1359: if (stmt != null)
1360: stmt.close();
1361: } catch (Throwable ignore) {
1362: }
1363: try {
1364: if (c != null)
1365: c.close();
1366: } catch (Throwable ignore) {
1367: }
1368: tms.endTX();
1369:
1370: // Restore the interrupted state of the thread
1371: if (threadWasInterrupted)
1372: Thread.currentThread().interrupt();
1373: }
1374: }
1375:
1376: public void saveToStorage(MessageReference messageRef,
1377: SpyMessage message) throws JMSException {
1378: // Ignore save operations for persistent messages sent to persistent queues
1379: // The queues handle the persistence
1380: if (messageRef.isPersistent())
1381: return;
1382:
1383: boolean trace = log.isTraceEnabled();
1384: if (trace)
1385: log.trace("Saving message to storage " + messageRef);
1386:
1387: TransactionManagerStrategy tms = new TransactionManagerStrategy();
1388: tms.startTX();
1389: Connection c = null;
1390: boolean threadWasInterrupted = Thread.interrupted();
1391: try {
1392:
1393: c = this .getConnection();
1394: add(c, messageRef.getPersistentKey(), message, null, "T");
1395: messageRef.setStored(MessageReference.STORED);
1396:
1397: if (trace)
1398: log.trace("Saved message to storage " + messageRef);
1399: } catch (IOException e) {
1400: tms.setRollbackOnly();
1401: throw new SpyJMSException("Could not store message: "
1402: + messageRef, e);
1403: } catch (SQLException e) {
1404: tms.setRollbackOnly();
1405: throw new SpyJMSException("Could not store message: "
1406: + messageRef, e);
1407: } finally {
1408: try {
1409: if (c != null)
1410: c.close();
1411: } catch (Throwable ignore) {
1412: }
1413: tms.endTX();
1414:
1415: // Restore the interrupted state of the thread
1416: if (threadWasInterrupted)
1417: Thread.currentThread().interrupt();
1418: }
1419: }
1420:
1421: /**
1422: * Gets a connection from the datasource, retrying as needed. This was
1423: * implemented because in some minimal configurations (i.e. little logging
1424: * and few services) the database wasn't ready when we tried to get a
1425: * connection. We, therefore, implement a retry loop wich is controled
1426: * by the ConnectionRetryAttempts attribute. Submitted by terry@amicas.com
1427: *
1428: * @return the connection
1429: * @exception SQLException if an error occurs.
1430: */
1431: protected Connection getConnection() throws SQLException {
1432: int attempts = this .connectionRetryAttempts;
1433: int attemptCount = 0;
1434: SQLException sqlException = null;
1435: while (attempts-- > 0) {
1436: if (++attemptCount > 1) {
1437: log.debug("Retrying connection: attempt # "
1438: + attemptCount);
1439: }
1440: try {
1441: sqlException = null;
1442: return datasource.getConnection();
1443: } catch (SQLException exception) {
1444: log.debug("Connection attempt # " + attemptCount
1445: + " failed with SQLException", exception);
1446: sqlException = exception;
1447: } finally {
1448: if (sqlException == null && attemptCount > 1) {
1449: log.debug("Connection succeeded on attempt # "
1450: + attemptCount);
1451: }
1452: }
1453:
1454: if (attempts > 0) {
1455: try {
1456: Thread.sleep(1500);
1457: } catch (InterruptedException interruptedException) {
1458: break;
1459: }
1460: }
1461: }
1462: if (sqlException != null) {
1463: throw sqlException;
1464: }
1465: throw new SQLException("connection attempt interrupted");
1466: }
1467:
1468: /////////////////////////////////////////////////////////////////////////////////
1469: //
1470: // JMX Interface
1471: //
1472: /////////////////////////////////////////////////////////////////////////////////
1473:
1474: /** The object name of the DataSource */
1475: protected ObjectName connectionManagerName;
1476:
1477: /** The SQL properties */
1478: protected Properties sqlProperties = new Properties();
1479:
1480: public void startService() throws Exception {
1481: UPDATE_MARKED_MESSAGES = sqlProperties.getProperty(
1482: "UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
1483: UPDATE_MARKED_MESSAGES_XARECOVERY = sqlProperties.getProperty(
1484: "UPDATE_MARKED_MESSAGES_XARECOVERY",
1485: UPDATE_MARKED_MESSAGES_XARECOVERY);
1486: UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty(
1487: "UPDATE_MARKED_MESSAGES_WITH_TX",
1488: UPDATE_MARKED_MESSAGES_WITH_TX);
1489: DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty(
1490: "DELETE_MARKED_MESSAGES_WITH_TX",
1491: DELETE_MARKED_MESSAGES_WITH_TX);
1492: DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = sqlProperties
1493: .getProperty(
1494: "DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY",
1495: DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY);
1496: DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
1497: DELETE_MARKED_MESSAGES = sqlProperties.getProperty(
1498: "DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
1499: DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty(
1500: "DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
1501: INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
1502: INSERT_TX_XARECOVERY = sqlProperties.getProperty(
1503: "INSERT_TX_XARECOVERY", INSERT_TX_XARECOVERY);
1504: DELETE_ALL_TX = sqlProperties.getProperty("DELETE_ALL_TX",
1505: DELETE_ALL_TX);
1506: DELETE_ALL_TX_XARECOVERY = sqlProperties.getProperty(
1507: "DELETE_ALL_TX_XARECOVERY", DELETE_ALL_TX_XARECOVERY);
1508: SELECT_ALL_TX_XARECOVERY = sqlProperties.getProperty(
1509: "SELECT_ALL_TX_XARECOVERY", SELECT_ALL_TX_XARECOVERY);
1510: SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX",
1511: SELECT_MAX_TX);
1512: SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty(
1513: "SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
1514: SELECT_MESSAGES_IN_DEST_XARECOVERY = sqlProperties.getProperty(
1515: "SELECT_MESSAGES_IN_DEST_XARECOVERY",
1516: SELECT_MESSAGES_IN_DEST_XARECOVERY);
1517: SELECT_MESSAGE_KEYS_IN_DEST = sqlProperties.getProperty(
1518: "SELECT_MESSAGE_KEYS_IN_DEST",
1519: SELECT_MESSAGE_KEYS_IN_DEST);
1520: SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE",
1521: SELECT_MESSAGE);
1522: SELECT_MESSAGE_XARECOVERY = sqlProperties.getProperty(
1523: "SELECT_MESSAGE_XARECOVERY", SELECT_MESSAGE_XARECOVERY);
1524: INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE",
1525: INSERT_MESSAGE);
1526: MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE",
1527: MARK_MESSAGE);
1528: DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE",
1529: DELETE_MESSAGE);
1530: UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE",
1531: UPDATE_MESSAGE);
1532: CREATE_MESSAGE_TABLE = sqlProperties.getProperty(
1533: "CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
1534: CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty(
1535: "CREATE_IDX_MESSAGE_TXOP_TXID",
1536: CREATE_IDX_MESSAGE_TXOP_TXID);
1537: CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty(
1538: "CREATE_IDX_MESSAGE_DESTINATION",
1539: CREATE_IDX_MESSAGE_DESTINATION);
1540: CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE",
1541: CREATE_TX_TABLE);
1542: CREATE_TX_TABLE_XARECOVERY = sqlProperties.getProperty(
1543: "CREATE_TX_TABLE_XARECOVERY",
1544: CREATE_TX_TABLE_XARECOVERY);
1545: createTables = sqlProperties.getProperty(
1546: "CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase(
1547: "true");
1548: String s = sqlProperties
1549: .getProperty("BLOB_TYPE", "OBJECT_BLOB");
1550:
1551: if (s.equals("OBJECT_BLOB")) {
1552: blobType = OBJECT_BLOB;
1553: } else if (s.equals("BYTES_BLOB")) {
1554: blobType = BYTES_BLOB;
1555: } else if (s.equals("BINARYSTREAM_BLOB")) {
1556: blobType = BINARYSTREAM_BLOB;
1557: } else if (s.equals("BLOB_BLOB")) {
1558: blobType = BLOB_BLOB;
1559: }
1560:
1561: // initialize tm and datasource
1562: initializeFields();
1563:
1564: log.debug("Creating Schema");
1565: try {
1566: createSchema();
1567: } catch (Exception e) {
1568: log.warn("Error creating schema", e);
1569: }
1570:
1571: log.debug("Resolving uncommited TXS");
1572: Throwable error = null;
1573: for (int i = 0; i <= recoveryRetries; ++i) {
1574: try {
1575: resolveAllUncommitedTXs();
1576:
1577: // done
1578: break;
1579: } catch (Throwable t) {
1580: if (i < recoveryRetries)
1581: log.warn("Error resolving transactions retries="
1582: + i + " of " + recoveryRetries, t);
1583: else
1584: error = t;
1585: }
1586: }
1587:
1588: if (error != null)
1589: SpyJMSException.rethrowAsJMSException(
1590: "Unable to resolve transactions retries="
1591: + recoveryRetries, error);
1592: }
1593:
1594: protected void initializeFields() throws MBeanException,
1595: AttributeNotFoundException, InstanceNotFoundException,
1596: ReflectionException, NamingException {
1597: //Find the ConnectionFactoryLoader MBean so we can find the datasource
1598: String dsName = (String) getServer().getAttribute(
1599: connectionManagerName, "BindName");
1600: //Get an InitialContext
1601:
1602: InitialContext ctx = new InitialContext();
1603: datasource = (DataSource) ctx.lookup(dsName);
1604:
1605: //Get the Transaction Manager so we can control the jdbc tx
1606: tm = (TransactionManager) ctx
1607: .lookup(TransactionManagerService.JNDI_NAME);
1608: }
1609:
1610: public Object getInstance() {
1611: return this ;
1612: }
1613:
1614: public ObjectName getMessageCache() {
1615: throw new UnsupportedOperationException(
1616: "This is now set on the destination manager");
1617: }
1618:
1619: public void setMessageCache(ObjectName messageCache) {
1620: throw new UnsupportedOperationException(
1621: "This is now set on the destination manager");
1622: }
1623:
1624: public ObjectName getConnectionManager() {
1625: return connectionManagerName;
1626: }
1627:
1628: public void setConnectionManager(ObjectName connectionManagerName) {
1629: this .connectionManagerName = connectionManagerName;
1630: }
1631:
1632: public MessageCache getMessageCacheInstance() {
1633: throw new UnsupportedOperationException(
1634: "This is now set on the destination manager");
1635: }
1636:
1637: public String getSqlProperties() {
1638: try {
1639: ByteArrayOutputStream boa = new ByteArrayOutputStream();
1640: sqlProperties.store(boa, "");
1641: return new String(boa.toByteArray());
1642: } catch (IOException shouldnothappen) {
1643: return "";
1644: }
1645: }
1646:
1647: public void setSqlProperties(String value) {
1648: try {
1649: ByteArrayInputStream is = new ByteArrayInputStream(value
1650: .getBytes());
1651: sqlProperties = new Properties();
1652: sqlProperties.load(is);
1653: } catch (IOException shouldnothappen) {
1654: }
1655: }
1656:
1657: public void setConnectionRetryAttempts(int value) {
1658: this .connectionRetryAttempts = value;
1659: }
1660:
1661: public int getConnectionRetryAttempts() {
1662: return this .connectionRetryAttempts;
1663: }
1664:
1665: public int getRecoveryTimeout() {
1666: return recoveryTimeout;
1667: }
1668:
1669: public void setRecoveryTimeout(int timeout) {
1670: this .recoveryTimeout = timeout;
1671: }
1672:
1673: public int getRecoveryRetries() {
1674: return recoveryRetries;
1675: }
1676:
1677: public void setRecoveryRetries(int retries) {
1678: this .recoveryRetries = retries;
1679: }
1680:
1681: public int getRecoverMessagesChunk() {
1682: return recoverMessagesChunk;
1683: }
1684:
1685: public void setRecoverMessagesChunk(int recoverMessagesChunk) {
1686: if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1) {
1687: log
1688: .warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1");
1689: recoverMessagesChunk = 1;
1690: }
1691: this .recoverMessagesChunk = recoverMessagesChunk;
1692: }
1693:
1694: public boolean isXARecovery() {
1695: return xaRecovery;
1696: }
1697:
1698: public void setXARecovery(boolean xaRecovery) {
1699: this .xaRecovery = xaRecovery;
1700: }
1701:
1702: public int getStatementRetries() {
1703: return statementRetries;
1704: }
1705:
1706: public void setStatementRetries(int statementRetries) {
1707: if (statementRetries < 0)
1708: statementRetries = 0;
1709: this.statementRetries = statementRetries;
1710: }
1711: }
|