Source Code Cross Referenced for PersistenceManager.java in  » EJB-Server-JBoss-4.2.1 » messaging » org » jboss » mq » pm » jdbc2 » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » messaging » org.jboss.mq.pm.jdbc2 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * JBoss, Home of Professional Open Source.
0003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
0004:         * as indicated by the @author tags. See the copyright.txt file in the
0005:         * distribution for a full listing of individual contributors.
0006:         *
0007:         * This is free software; you can redistribute it and/or modify it
0008:         * under the terms of the GNU Lesser General Public License as
0009:         * published by the Free Software Foundation; either version 2.1 of
0010:         * the License, or (at your option) any later version.
0011:         *
0012:         * This software is distributed in the hope that it will be useful,
0013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0015:         * Lesser General Public License for more details.
0016:         *
0017:         * You should have received a copy of the GNU Lesser General Public
0018:         * License along with this software; if not, write to the Free
0019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
0020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
0021:         */
0022:        package org.jboss.mq.pm.jdbc2;
0023:
0024:        import java.io.ByteArrayInputStream;
0025:        import java.io.ByteArrayOutputStream;
0026:        import java.io.IOException;
0027:        import java.io.ObjectInputStream;
0028:        import java.io.ObjectOutputStream;
0029:        import java.io.StreamCorruptedException;
0030:        import java.sql.Connection;
0031:        import java.sql.PreparedStatement;
0032:        import java.sql.ResultSet;
0033:        import java.sql.SQLException;
0034:        import java.util.HashMap;
0035:        import java.util.Iterator;
0036:        import java.util.Map;
0037:        import java.util.Properties;
0038:
0039:        import javax.jms.JMSException;
0040:        import javax.management.AttributeNotFoundException;
0041:        import javax.management.InstanceNotFoundException;
0042:        import javax.management.MBeanException;
0043:        import javax.management.ObjectName;
0044:        import javax.management.ReflectionException;
0045:        import javax.naming.InitialContext;
0046:        import javax.naming.NamingException;
0047:        import javax.sql.DataSource;
0048:        import javax.transaction.Status;
0049:        import javax.transaction.Transaction;
0050:        import javax.transaction.TransactionManager;
0051:        import javax.transaction.xa.Xid;
0052:
0053:        import org.jboss.mq.SpyDestination;
0054:        import org.jboss.mq.SpyJMSException;
0055:        import org.jboss.mq.SpyMessage;
0056:        import org.jboss.mq.SpyTopic;
0057:        import org.jboss.mq.pm.CacheStore;
0058:        import org.jboss.mq.pm.Tx;
0059:        import org.jboss.mq.pm.TxManager;
0060:        import org.jboss.mq.server.JMSDestination;
0061:        import org.jboss.mq.server.MessageCache;
0062:        import org.jboss.mq.server.MessageReference;
0063:        import org.jboss.system.ServiceMBeanSupport;
0064:        import org.jboss.tm.TransactionManagerService;
0065:        import org.jboss.tm.TransactionTimeoutConfiguration;
0066:        import org.jboss.util.UnreachableStatementException;
0067:
0068:        import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
0069:
0070:        /**
0071:         * This class manages all persistence related services for JDBC based
0072:         * persistence.
0073:         *
0074:         * @author Jayesh Parayali (jayeshpk1@yahoo.com)
0075:         * @author Hiram Chirino (cojonudo14@hotmail.com)
0076:         * @author Adrian Brock (adrian@jboss.com)
0077:         * @version $Revision: 61581 $
0078:         */
0079:        public class PersistenceManager extends ServiceMBeanSupport implements 
0080:                PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager,
0081:                CacheStore {
0082:            /** FAQ about concurrency problems */
0083:            private static String CONCURRENCY_WARNING = "\nCommon reasons for missing messages are \n1) You have multiple JBossMQs running over the same database.\n2) You are using a replicating database that is not keeping up with replication.";
0084:
0085:            /////////////////////////////////////////////////////////////////////////////////
0086:            //
0087:            // TX state attibutes
0088:            //
0089:            /////////////////////////////////////////////////////////////////////////////////
0090:
0091:            /** The next transaction id */
0092:            protected SynchronizedLong nextTransactionId = new SynchronizedLong(
0093:                    0l);
0094:
0095:            /** The jta transaction manager */
0096:            protected TxManager txManager;
0097:
0098:            /** The DataSource */
0099:            protected DataSource datasource;
0100:
0101:            /** The JBossMQ transaction mananger */
0102:            protected TransactionManager tm;
0103:
0104:            /** The override recovery timeout */
0105:            private int recoveryTimeout = 0;
0106:
0107:            /** The recovery retries */
0108:            private int recoveryRetries = 0;
0109:
0110:            /** The recover messages chunk  */
0111:            private int recoverMessagesChunk = 0;
0112:
0113:            /** The statement retries */
0114:            private int statementRetries = 5;
0115:
0116:            /////////////////////////////////////////////////////////////////////////////////
0117:            //
0118:            // JDBC Access Attributes
0119:            //
0120:            /////////////////////////////////////////////////////////////////////////////////
0121:
0122:            protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
0123:            protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
0124:            protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
0125:            protected String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
0126:            protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
0127:            protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
0128:            protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
0129:            protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
0130:            protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
0131:            protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
0132:            protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
0133:            protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
0134:            protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
0135:            protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
0136:            protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
0137:            protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
0138:            protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
0139:            protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
0140:            protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
0141:            protected String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
0142:            protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
0143:            protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
0144:            protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
0145:            protected String CREATE_MESSAGE_TABLE = "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
0146:                    + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
0147:                    + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
0148:            protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
0149:            protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
0150:            protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
0151:            protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
0152:
0153:            protected static final int OBJECT_BLOB = 0;
0154:            protected static final int BYTES_BLOB = 1;
0155:            protected static final int BINARYSTREAM_BLOB = 2;
0156:            protected static final int BLOB_BLOB = 3;
0157:
0158:            protected int blobType = OBJECT_BLOB;
0159:            protected boolean createTables;
0160:
0161:            protected int connectionRetryAttempts = 5;
0162:
0163:            protected boolean xaRecovery = false;
0164:
0165:            /////////////////////////////////////////////////////////////////////////////////
0166:            //
0167:            // Constructor.
0168:            //
0169:            /////////////////////////////////////////////////////////////////////////////////
0170:            public PersistenceManager() throws javax.jms.JMSException {
0171:                txManager = new TxManager(this );
0172:            }
0173:
0174:            /**
0175:             * This inner class helps handle the tx management of the jdbc connections.
0176:             * 
0177:             */
0178:            protected class TransactionManagerStrategy {
0179:
0180:                Transaction threadTx;
0181:
0182:                void startTX() throws JMSException {
0183:                    try {
0184:                        // Thread arriving must be clean (jboss doesn't set the thread
0185:                        // previously). However optimized calls come with associated
0186:                        // thread for example. We suspend the thread association here, and
0187:                        // resume in the finally block of the following try.
0188:                        threadTx = tm.suspend();
0189:
0190:                        // Always begin a transaction
0191:                        tm.begin();
0192:                    } catch (Exception e) {
0193:                        try {
0194:                            if (threadTx != null)
0195:                                tm.resume(threadTx);
0196:                        } catch (Exception ignore) {
0197:                        }
0198:                        throw new SpyJMSException(
0199:                                "Could not start a transaction with the transaction manager.",
0200:                                e);
0201:                    }
0202:                }
0203:
0204:                void setRollbackOnly() throws JMSException {
0205:                    try {
0206:                        tm.setRollbackOnly();
0207:                    } catch (Exception e) {
0208:                        throw new SpyJMSException(
0209:                                "Could not start a mark the transaction for rollback .",
0210:                                e);
0211:                    }
0212:                }
0213:
0214:                void endTX() throws JMSException {
0215:                    try {
0216:                        if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
0217:                            tm.rollback();
0218:                        } else {
0219:                            tm.commit();
0220:                        }
0221:                    } catch (Exception e) {
0222:                        throw new SpyJMSException(
0223:                                "Could not start a transaction with the transaction manager.",
0224:                                e);
0225:                    } finally {
0226:                        try {
0227:                            if (threadTx != null)
0228:                                tm.resume(threadTx);
0229:                        } catch (Exception ignore) {
0230:                        }
0231:                    }
0232:                }
0233:            }
0234:
0235:            /////////////////////////////////////////////////////////////////////////////////
0236:            //
0237:            // TX Resolution.
0238:            //
0239:            /////////////////////////////////////////////////////////////////////////////////
0240:
0241:            synchronized protected void createSchema() throws JMSException {
0242:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
0243:                tms.startTX();
0244:                Connection c = null;
0245:                PreparedStatement stmt = null;
0246:                boolean threadWasInterrupted = Thread.interrupted();
0247:                try {
0248:                    if (createTables) {
0249:                        c = this .getConnection();
0250:
0251:                        boolean createdMessageTable = false;
0252:                        try {
0253:                            stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
0254:                            stmt.executeUpdate();
0255:                            createdMessageTable = true;
0256:                        } catch (SQLException e) {
0257:                            log.debug("Could not create table with SQL: "
0258:                                    + CREATE_MESSAGE_TABLE, e);
0259:                        } finally {
0260:                            try {
0261:                                if (stmt != null)
0262:                                    stmt.close();
0263:                            } catch (Throwable ignored) {
0264:                                log.trace("Ignored: " + ignored);
0265:                            }
0266:                            stmt = null;
0267:                        }
0268:
0269:                        if (createdMessageTable) {
0270:                            try {
0271:                                stmt = c
0272:                                        .prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
0273:                                stmt.executeUpdate();
0274:                            } catch (SQLException e) {
0275:                                log.debug("Could not create index with SQL: "
0276:                                        + CREATE_IDX_MESSAGE_TXOP_TXID, e);
0277:                            } finally {
0278:                                try {
0279:                                    if (stmt != null)
0280:                                        stmt.close();
0281:                                } catch (Throwable ignored) {
0282:                                    log.trace("Ignored: " + ignored);
0283:                                }
0284:                                stmt = null;
0285:                            }
0286:                            try {
0287:                                stmt = c
0288:                                        .prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
0289:                                stmt.executeUpdate();
0290:                            } catch (SQLException e) {
0291:                                log.debug("Could not create index with SQL: "
0292:                                        + CREATE_IDX_MESSAGE_DESTINATION, e);
0293:                            } finally {
0294:                                try {
0295:                                    if (stmt != null)
0296:                                        stmt.close();
0297:                                } catch (Throwable ignored) {
0298:                                    log.trace("Ignored: " + ignored);
0299:                                }
0300:                                stmt = null;
0301:                            }
0302:                        }
0303:
0304:                        String createTxTable = CREATE_TX_TABLE;
0305:                        if (xaRecovery)
0306:                            createTxTable = CREATE_TX_TABLE_XARECOVERY;
0307:                        try {
0308:                            stmt = c.prepareStatement(createTxTable);
0309:                            stmt.executeUpdate();
0310:                        } catch (SQLException e) {
0311:                            log.debug("Could not create table with SQL: "
0312:                                    + createTxTable, e);
0313:                        } finally {
0314:                            try {
0315:                                if (stmt != null)
0316:                                    stmt.close();
0317:                            } catch (Throwable ignored) {
0318:                                log.trace("Ignored: " + ignored);
0319:                            }
0320:                            stmt = null;
0321:                        }
0322:                    }
0323:                } catch (SQLException e) {
0324:                    tms.setRollbackOnly();
0325:                    throw new SpyJMSException(
0326:                            "Could not get a connection for jdbc2 table construction ",
0327:                            e);
0328:                } finally {
0329:                    try {
0330:                        if (stmt != null)
0331:                            stmt.close();
0332:                    } catch (Throwable ignore) {
0333:                    }
0334:                    stmt = null;
0335:                    try {
0336:                        if (c != null)
0337:                            c.close();
0338:                    } catch (Throwable ignore) {
0339:                    }
0340:                    c = null;
0341:                    tms.endTX();
0342:
0343:                    // Restore the interrupted state of the thread
0344:                    if (threadWasInterrupted)
0345:                        Thread.currentThread().interrupt();
0346:                }
0347:            }
0348:
0349:            synchronized protected void resolveAllUncommitedTXs()
0350:                    throws JMSException {
0351:                // We perform recovery in a different thread to the table creation
0352:                // Postgres doesn't like create table failing in the same transaction
0353:                // as other operations
0354:
0355:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
0356:                tms.startTX();
0357:                Connection c = null;
0358:                PreparedStatement stmt = null;
0359:                ResultSet rs = null;
0360:                boolean threadWasInterrupted = Thread.interrupted();
0361:                try {
0362:                    c = this .getConnection();
0363:
0364:                    // Find out what the next TXID should be
0365:                    stmt = c.prepareStatement(SELECT_MAX_TX);
0366:                    rs = stmt.executeQuery();
0367:                    if (rs.next())
0368:                        nextTransactionId.set(rs.getLong(1) + 1);
0369:                    rs.close();
0370:                    rs = null;
0371:                    stmt.close();
0372:                    stmt = null;
0373:
0374:                    // Delete all the temporary messages.
0375:                    stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
0376:                    stmt.executeUpdate();
0377:                    stmt.close();
0378:                    stmt = null;
0379:
0380:                    // Delete all the messages that were added but thier tx's were not commited.
0381:                    String deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX;
0382:                    if (xaRecovery)
0383:                        deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
0384:                    stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
0385:                    stmt.setString(1, "A");
0386:                    stmt.executeUpdate();
0387:                    stmt.close();
0388:                    stmt = null;
0389:
0390:                    // Restore all the messages that were removed but their tx's were not commited.
0391:                    String updateMarkedMessages = UPDATE_MARKED_MESSAGES;
0392:                    if (xaRecovery)
0393:                        updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY;
0394:                    stmt = c.prepareStatement(updateMarkedMessages);
0395:                    stmt.setNull(1, java.sql.Types.BIGINT);
0396:                    stmt.setString(2, "A");
0397:                    stmt.setString(3, "D");
0398:                    stmt.executeUpdate();
0399:                    stmt.close();
0400:                    stmt = null;
0401:
0402:                    // Now recovery is complete, clear the transaction table.
0403:                    String deleteAllTx = DELETE_ALL_TX;
0404:                    if (xaRecovery)
0405:                        deleteAllTx = DELETE_ALL_TX_XARECOVERY;
0406:                    stmt = c.prepareStatement(deleteAllTx);
0407:                    stmt.execute();
0408:                    stmt.close();
0409:                    stmt = null;
0410:
0411:                    // If we are doing XARecovery restore the prepared transactions
0412:                    if (xaRecovery) {
0413:                        stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY);
0414:                        rs = stmt.executeQuery();
0415:                        while (rs.next()) {
0416:                            long txid = rs.getLong(1);
0417:                            Xid xid = extractXid(rs, 2);
0418:                            Tx tx = new Tx(txid);
0419:                            tx.setXid(xid);
0420:                            tx.checkPersisted();
0421:                            txManager.restoreTx(tx);
0422:                        }
0423:                        rs.close();
0424:                        rs = null;
0425:                        stmt.close();
0426:                        stmt = null;
0427:                    }
0428:                } catch (Exception e) {
0429:                    tms.setRollbackOnly();
0430:                    throw new SpyJMSException(
0431:                            "Could not resolve uncommited transactions.  Message recovery may not be accurate",
0432:                            e);
0433:                } finally {
0434:                    try {
0435:                        if (rs != null)
0436:                            rs.close();
0437:                    } catch (Throwable ignore) {
0438:                    }
0439:                    try {
0440:                        if (stmt != null)
0441:                            stmt.close();
0442:                    } catch (Throwable ignore) {
0443:                    }
0444:                    try {
0445:                        if (c != null)
0446:                            c.close();
0447:                    } catch (Throwable ignore) {
0448:                    }
0449:                    tms.endTX();
0450:
0451:                    // Restore the interrupted state of the thread
0452:                    if (threadWasInterrupted)
0453:                        Thread.currentThread().interrupt();
0454:                }
0455:            }
0456:
0457:            /////////////////////////////////////////////////////////////////////////////////
0458:            //
0459:            // Message Recovery
0460:            //
0461:            /////////////////////////////////////////////////////////////////////////////////
0462:
0463:            synchronized public void restoreQueue(JMSDestination jmsDest,
0464:                    SpyDestination dest) throws JMSException {
0465:                if (jmsDest == null)
0466:                    throw new IllegalArgumentException(
0467:                            "Must supply non null JMSDestination to restoreQueue");
0468:                if (dest == null)
0469:                    throw new IllegalArgumentException(
0470:                            "Must supply non null SpyDestination to restoreQueue");
0471:
0472:                boolean canOverrideTimeout = (tm instanceof  TransactionTimeoutConfiguration);
0473:                int previousTimeout = 0;
0474:                try {
0475:                    // Set our timeout
0476:                    if (recoveryTimeout != 0) {
0477:                        if (canOverrideTimeout) {
0478:                            previousTimeout = ((TransactionTimeoutConfiguration) tm)
0479:                                    .getTransactionTimeout();
0480:                            tm.setTransactionTimeout(recoveryTimeout);
0481:                        } else {
0482:                            log
0483:                                    .debug("Cannot override recovery timeout, TransactionManager does implement "
0484:                                            + TransactionTimeoutConfiguration.class
0485:                                                    .getName());
0486:                        }
0487:                    }
0488:
0489:                    // restore the queue
0490:                    try {
0491:                        internalRestoreQueue(jmsDest, dest);
0492:                    } finally {
0493:                        // restore the transaction timeout
0494:                        if (recoveryTimeout != 0 && canOverrideTimeout)
0495:                            tm.setTransactionTimeout(previousTimeout);
0496:                    }
0497:                } catch (Exception e) {
0498:                    SpyJMSException.rethrowAsJMSException(
0499:                            "Unexpected error in recovery", e);
0500:                }
0501:            }
0502:
0503:            synchronized protected void internalRestoreQueue(
0504:                    JMSDestination jmsDest, SpyDestination dest)
0505:                    throws JMSException {
0506:                // Work out the prepared transactions
0507:                Map prepared = null;
0508:                if (xaRecovery) {
0509:                    prepared = new HashMap();
0510:                    Map map = txManager.getPreparedTransactions();
0511:                    for (Iterator i = map.values().iterator(); i.hasNext();) {
0512:                        TxManager.PreparedInfo info = (TxManager.PreparedInfo) i
0513:                                .next();
0514:                        for (Iterator j = info.getTxids().iterator(); j
0515:                                .hasNext();) {
0516:                            Tx tx = (Tx) j.next();
0517:                            prepared.put(new Long(tx.longValue()), tx);
0518:                        }
0519:                    }
0520:                }
0521:
0522:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
0523:                tms.startTX();
0524:                Connection c = null;
0525:                PreparedStatement stmt = null;
0526:                PreparedStatement stmt2 = null;
0527:                ResultSet rs = null;
0528:                boolean threadWasInterrupted = Thread.interrupted();
0529:                try {
0530:                    String selectMessagesInDest = SELECT_MESSAGES_IN_DEST;
0531:                    String selectMessage = SELECT_MESSAGE;
0532:                    if (xaRecovery) {
0533:                        selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY;
0534:                        selectMessage = SELECT_MESSAGE_XARECOVERY;
0535:                    }
0536:                    c = this .getConnection();
0537:                    if (recoverMessagesChunk == 0)
0538:                        stmt = c.prepareStatement(selectMessagesInDest);
0539:                    else {
0540:                        stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST);
0541:                        stmt2 = c.prepareStatement(selectMessage);
0542:                    }
0543:                    stmt.setString(1, dest.toString());
0544:
0545:                    long txid = 0;
0546:                    String txop = null;
0547:                    rs = stmt.executeQuery();
0548:                    int counter = 0;
0549:                    int recovery = 0;
0550:                    while (rs.next()) {
0551:                        long msgid = rs.getLong(1);
0552:                        SpyMessage message = null;
0553:                        if (recoverMessagesChunk == 0) {
0554:                            message = extractMessage(rs);
0555:                            if (xaRecovery) {
0556:                                txid = rs.getLong(3);
0557:                                txop = rs.getString(4);
0558:                            }
0559:                        } else {
0560:                            ResultSet rs2 = null;
0561:                            try {
0562:                                stmt2.setLong(1, msgid);
0563:                                stmt2.setString(2, dest.toString());
0564:                                rs2 = stmt2.executeQuery();
0565:                                if (rs2.next()) {
0566:                                    message = extractMessage(rs2);
0567:                                    if (xaRecovery) {
0568:                                        txid = rs.getLong(3);
0569:                                        txop = rs.getString(4);
0570:                                    }
0571:                                } else
0572:                                    log.warn("Failed to find message msgid="
0573:                                            + msgid + " dest=" + dest);
0574:                            } finally {
0575:                                if (rs2 != null) {
0576:                                    try {
0577:                                        rs2.close();
0578:                                    } catch (Exception ignored) {
0579:                                    }
0580:                                }
0581:                            }
0582:                        }
0583:                        // The durable subscription is not serialized
0584:                        if (dest instanceof  SpyTopic)
0585:                            message.header.durableSubscriberID = ((SpyTopic) dest)
0586:                                    .getDurableSubscriptionID();
0587:
0588:                        if (xaRecovery == false || txid == 0 || txop == null)
0589:                            jmsDest.restoreMessage(message);
0590:                        else {
0591:                            Tx tx = (Tx) prepared.get(new Long(txid));
0592:                            if (tx == null)
0593:                                jmsDest.restoreMessage(message);
0594:                            else if ("A".equals(txop)) {
0595:                                jmsDest.restoreMessage(message, tx, Tx.ADD);
0596:                                recovery++;
0597:                            } else if ("D".equals(txop)) {
0598:                                jmsDest.restoreMessage(message, tx, Tx.REMOVE);
0599:                                recovery++;
0600:                            } else
0601:                                throw new IllegalStateException("Unknown txop="
0602:                                        + txop + " for msg=" + msgid + " dest="
0603:                                        + dest);
0604:                        }
0605:                        counter++;
0606:                    }
0607:
0608:                    log.debug("Restored " + counter + " message(s) to: " + dest
0609:                            + " " + recovery + " need recovery.");
0610:                } catch (IOException e) {
0611:                    tms.setRollbackOnly();
0612:                    throw new SpyJMSException(
0613:                            "Could not restore messages to destination : "
0614:                                    + dest.toString(), e);
0615:                } catch (SQLException e) {
0616:                    tms.setRollbackOnly();
0617:                    throw new SpyJMSException(
0618:                            "Could not restore messages to destination : "
0619:                                    + dest.toString(), e);
0620:                } finally {
0621:                    try {
0622:                        if (rs != null)
0623:                            rs.close();
0624:                    } catch (Throwable ignore) {
0625:                    }
0626:                    try {
0627:                        if (stmt != null)
0628:                            stmt.close();
0629:                    } catch (Throwable ignore) {
0630:                    }
0631:                    try {
0632:                        if (c != null)
0633:                            c.close();
0634:                    } catch (Throwable ignore) {
0635:                    }
0636:                    tms.endTX();
0637:
0638:                    // Restore the interrupted state of the thread
0639:                    if (threadWasInterrupted)
0640:                        Thread.currentThread().interrupt();
0641:                }
0642:
0643:            }
0644:
0645:            SpyMessage extractMessage(ResultSet rs) throws SQLException,
0646:                    IOException {
0647:                try {
0648:                    long messageid = rs.getLong(1);
0649:
0650:                    SpyMessage message = null;
0651:
0652:                    if (blobType == OBJECT_BLOB) {
0653:
0654:                        message = (SpyMessage) rs.getObject(2);
0655:
0656:                    } else if (blobType == BYTES_BLOB) {
0657:
0658:                        byte[] st = rs.getBytes(2);
0659:                        ByteArrayInputStream baip = new ByteArrayInputStream(st);
0660:                        ObjectInputStream ois = new ObjectInputStream(baip);
0661:                        message = SpyMessage.readMessage(ois);
0662:
0663:                    } else if (blobType == BINARYSTREAM_BLOB) {
0664:
0665:                        ObjectInputStream ois = new ObjectInputStream(rs
0666:                                .getBinaryStream(2));
0667:                        message = SpyMessage.readMessage(ois);
0668:
0669:                    } else if (blobType == BLOB_BLOB) {
0670:
0671:                        ObjectInputStream ois = new ObjectInputStream(rs
0672:                                .getBlob(2).getBinaryStream());
0673:                        message = SpyMessage.readMessage(ois);
0674:                    }
0675:
0676:                    message.header.messageId = messageid;
0677:                    return message;
0678:                } catch (StreamCorruptedException e) {
0679:                    throw new IOException("Could not load the message: " + e);
0680:                }
0681:            }
0682:
0683:            Xid extractXid(ResultSet rs, int column) throws SQLException,
0684:                    IOException, ClassNotFoundException {
0685:                try {
0686:                    Xid xid = null;
0687:
0688:                    if (blobType == OBJECT_BLOB) {
0689:                        xid = (Xid) rs.getObject(column);
0690:                    } else if (blobType == BYTES_BLOB) {
0691:                        byte[] st = rs.getBytes(column);
0692:                        ByteArrayInputStream baip = new ByteArrayInputStream(st);
0693:                        ObjectInputStream ois = new ObjectInputStream(baip);
0694:                        xid = (Xid) ois.readObject();
0695:                    } else if (blobType == BINARYSTREAM_BLOB) {
0696:                        ObjectInputStream ois = new ObjectInputStream(rs
0697:                                .getBinaryStream(column));
0698:                        xid = (Xid) ois.readObject();
0699:                    } else if (blobType == BLOB_BLOB) {
0700:                        ObjectInputStream ois = new ObjectInputStream(rs
0701:                                .getBlob(column).getBinaryStream());
0702:                        xid = (Xid) ois.readObject();
0703:                    }
0704:
0705:                    return xid;
0706:                } catch (StreamCorruptedException e) {
0707:                    throw new IOException("Could not load the message: " + e);
0708:                }
0709:            }
0710:
0711:            /////////////////////////////////////////////////////////////////////////////////
0712:            //
0713:            // TX Commit
0714:            //
0715:            /////////////////////////////////////////////////////////////////////////////////
0716:            public void commitPersistentTx(Tx txId)
0717:                    throws javax.jms.JMSException {
0718:                if (txId.wasPersisted() == false)
0719:                    return;
0720:
0721:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
0722:                tms.startTX();
0723:                Connection c = null;
0724:                boolean threadWasInterrupted = Thread.interrupted();
0725:                try {
0726:
0727:                    c = this .getConnection();
0728:                    removeMarkedMessages(c, txId, "D");
0729:                    removeTXRecord(c, txId.longValue());
0730:
0731:                } catch (SQLException e) {
0732:                    tms.setRollbackOnly();
0733:                    throw new SpyJMSException("Could not commit tx: " + txId, e);
0734:                } finally {
0735:                    try {
0736:                        if (c != null)
0737:                            c.close();
0738:                    } catch (Throwable ignore) {
0739:                    }
0740:                    tms.endTX();
0741:
0742:                    // Restore the interrupted state of the thread
0743:                    if (threadWasInterrupted)
0744:                        Thread.currentThread().interrupt();
0745:                }
0746:            }
0747:
0748:            public void removeMarkedMessages(Connection c, Tx txid, String mark)
0749:                    throws SQLException {
0750:                PreparedStatement stmt = null;
0751:                try {
0752:                    stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
0753:                    stmt.setLong(1, txid.longValue());
0754:                    stmt.setString(2, mark);
0755:                    stmt.executeUpdate();
0756:                } finally {
0757:                    try {
0758:                        if (stmt != null)
0759:                            stmt.close();
0760:                    } catch (Throwable e) {
0761:                    }
0762:                }
0763:            }
0764:
0765:            public void addTXRecord(Connection c, Tx txid) throws SQLException,
0766:                    IOException {
0767:                PreparedStatement stmt = null;
0768:                try {
0769:                    String insertTx = INSERT_TX;
0770:                    if (xaRecovery)
0771:                        insertTx = INSERT_TX_XARECOVERY;
0772:                    stmt = c.prepareStatement(insertTx);
0773:                    stmt.setLong(1, txid.longValue());
0774:                    if (xaRecovery) {
0775:                        Xid xid = txid.getXid();
0776:                        if (xid != null)
0777:                            setBlob(stmt, 2, xid);
0778:                        else
0779:                            stmt.setNull(2, java.sql.Types.BLOB);
0780:                    }
0781:                    stmt.executeUpdate();
0782:                } finally {
0783:                    try {
0784:                        if (stmt != null)
0785:                            stmt.close();
0786:                    } catch (Throwable e) {
0787:                    }
0788:                }
0789:            }
0790:
0791:            public void removeTXRecord(Connection c, long txid)
0792:                    throws SQLException {
0793:                PreparedStatement stmt = null;
0794:                try {
0795:                    stmt = c.prepareStatement(DELETE_TX);
0796:                    stmt.setLong(1, txid);
0797:                    stmt.executeUpdate();
0798:                } finally {
0799:                    try {
0800:                        if (stmt != null)
0801:                            stmt.close();
0802:                    } catch (Throwable e) {
0803:                    }
0804:                }
0805:            }
0806:
0807:            /////////////////////////////////////////////////////////////////////////////////
0808:            //
0809:            // TX Rollback
0810:            //
0811:            /////////////////////////////////////////////////////////////////////////////////
0812:            public void rollbackPersistentTx(Tx txId) throws JMSException {
0813:                if (txId.wasPersisted() == false)
0814:                    return;
0815:
0816:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
0817:                tms.startTX();
0818:                Connection c = null;
0819:                PreparedStatement stmt = null;
0820:                boolean threadWasInterrupted = Thread.interrupted();
0821:                try {
0822:
0823:                    c = this .getConnection();
0824:                    removeMarkedMessages(c, txId, "A");
0825:                    removeTXRecord(c, txId.longValue());
0826:
0827:                    // Restore all the messages that were logically removed.
0828:                    stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
0829:                    stmt.setNull(1, java.sql.Types.BIGINT);
0830:                    stmt.setString(2, "A");
0831:                    stmt.setString(3, "D");
0832:                    stmt.setLong(4, txId.longValue());
0833:                    stmt.executeUpdate();
0834:                    stmt.close();
0835:                    stmt = null;
0836:                } catch (SQLException e) {
0837:                    tms.setRollbackOnly();
0838:                    throw new SpyJMSException("Could not rollback tx: " + txId,
0839:                            e);
0840:                } finally {
0841:                    try {
0842:                        if (stmt != null)
0843:                            stmt.close();
0844:                    } catch (Throwable ignore) {
0845:                    }
0846:                    try {
0847:                        if (c != null)
0848:                            c.close();
0849:                    } catch (Throwable ignore) {
0850:                    }
0851:                    tms.endTX();
0852:
0853:                    // Restore the interrupted state of the thread
0854:                    if (threadWasInterrupted)
0855:                        Thread.currentThread().interrupt();
0856:                }
0857:
0858:            }
0859:
0860:            /////////////////////////////////////////////////////////////////////////////////
0861:            //
0862:            // TX Creation
0863:            //
0864:            /////////////////////////////////////////////////////////////////////////////////
0865:            public Tx createPersistentTx() throws JMSException {
0866:                Tx id = new Tx(nextTransactionId.increment());
0867:                return id;
0868:            }
0869:
0870:            public void insertPersistentTx(TransactionManagerStrategy tms,
0871:                    Connection c, Tx tx) throws JMSException {
0872:                try {
0873:                    if (tx != null && tx.checkPersisted() == false)
0874:                        addTXRecord(c, tx);
0875:                } catch (Exception e) {
0876:                    tms.setRollbackOnly();
0877:                    throw new SpyJMSException("Could not create tx: "
0878:                            + tx.longValue(), e);
0879:                }
0880:            }
0881:
0882:            /////////////////////////////////////////////////////////////////////////////////
0883:            //
0884:            // Adding a message
0885:            //
0886:            /////////////////////////////////////////////////////////////////////////////////
0887:            public void add(MessageReference messageRef, Tx txId)
0888:                    throws javax.jms.JMSException {
0889:                boolean trace = log.isTraceEnabled();
0890:                if (trace)
0891:                    log.trace("About to add message " + messageRef
0892:                            + " transaction=" + txId);
0893:
0894:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
0895:                tms.startTX();
0896:                Connection c = null;
0897:                boolean threadWasInterrupted = Thread.interrupted();
0898:                try {
0899:                    c = this .getConnection();
0900:
0901:                    // Lazily write the peristent transaction
0902:                    insertPersistentTx(tms, c, txId);
0903:
0904:                    // Synchronize on the message to avoid a race with the softener
0905:                    synchronized (messageRef) {
0906:                        SpyMessage message = messageRef.getMessage();
0907:
0908:                        // has it allready been stored by the message cache interface??
0909:                        if (messageRef.stored == MessageReference.STORED) {
0910:                            if (trace)
0911:                                log.trace("Updating message " + messageRef
0912:                                        + " transaction=" + txId);
0913:
0914:                            markMessage(c, messageRef.messageId, messageRef
0915:                                    .getPersistentKey(), txId, "A");
0916:                        } else {
0917:                            if (trace)
0918:                                log.trace("Inserting message " + messageRef
0919:                                        + " transaction=" + txId);
0920:
0921:                            add(c, messageRef.getPersistentKey(), message,
0922:                                    txId, "A");
0923:                            messageRef.setStored(MessageReference.STORED);
0924:                        }
0925:                        if (trace)
0926:                            log.trace("Added message " + messageRef
0927:                                    + " transaction=" + txId);
0928:                    }
0929:                } catch (IOException e) {
0930:                    tms.setRollbackOnly();
0931:                    throw new SpyJMSException("Could not store message: "
0932:                            + messageRef, e);
0933:                } catch (SQLException e) {
0934:                    tms.setRollbackOnly();
0935:                    throw new SpyJMSException("Could not store message: "
0936:                            + messageRef, e);
0937:                } finally {
0938:                    try {
0939:                        if (c != null)
0940:                            c.close();
0941:                    } catch (Throwable ignore) {
0942:                    }
0943:                    tms.endTX();
0944:
0945:                    // Restore the interrupted state of the thread
0946:                    if (threadWasInterrupted)
0947:                        Thread.currentThread().interrupt();
0948:                }
0949:            }
0950:
0951:            protected void add(Connection c, String queue, SpyMessage message,
0952:                    Tx txId, String mark) throws SQLException, IOException {
0953:                PreparedStatement stmt = null;
0954:                try {
0955:
0956:                    stmt = c.prepareStatement(INSERT_MESSAGE);
0957:
0958:                    stmt.setLong(1, message.header.messageId);
0959:                    stmt.setString(2, queue);
0960:                    setBlob(stmt, 3, message);
0961:
0962:                    if (txId != null)
0963:                        stmt.setLong(4, txId.longValue());
0964:                    else
0965:                        stmt.setNull(4, java.sql.Types.BIGINT);
0966:                    stmt.setString(5, mark);
0967:
0968:                    stmt.executeUpdate();
0969:                } finally {
0970:                    try {
0971:                        if (stmt != null)
0972:                            stmt.close();
0973:                    } catch (Throwable ignore) {
0974:                    }
0975:                }
0976:            }
0977:
0978:            public void markMessage(Connection c, long messageid,
0979:                    String destination, Tx txId, String mark)
0980:                    throws SQLException {
0981:                PreparedStatement stmt = null;
0982:                try {
0983:
0984:                    stmt = c.prepareStatement(MARK_MESSAGE);
0985:                    if (txId == null) {
0986:                        stmt.setNull(1, java.sql.Types.BIGINT);
0987:                    } else {
0988:                        stmt.setLong(1, txId.longValue());
0989:                    }
0990:                    stmt.setString(2, mark);
0991:                    stmt.setLong(3, messageid);
0992:                    stmt.setString(4, destination);
0993:                    stmt.executeUpdate();
0994:                } finally {
0995:                    try {
0996:                        if (stmt != null)
0997:                            stmt.close();
0998:                    } catch (Throwable ignore) {
0999:                    }
1000:                }
1001:
1002:            }
1003:
1004:            public void setBlob(PreparedStatement stmt, int column,
1005:                    SpyMessage message) throws IOException, SQLException {
1006:                if (blobType == OBJECT_BLOB) {
1007:                    stmt.setObject(column, message);
1008:                } else if (blobType == BYTES_BLOB) {
1009:                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
1010:                    ObjectOutputStream oos = new ObjectOutputStream(baos);
1011:                    SpyMessage.writeMessage(message, oos);
1012:                    oos.flush();
1013:                    byte[] messageAsBytes = baos.toByteArray();
1014:                    stmt.setBytes(column, messageAsBytes);
1015:                } else if (blobType == BINARYSTREAM_BLOB) {
1016:                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
1017:                    ObjectOutputStream oos = new ObjectOutputStream(baos);
1018:                    SpyMessage.writeMessage(message, oos);
1019:                    oos.flush();
1020:                    byte[] messageAsBytes = baos.toByteArray();
1021:                    ByteArrayInputStream bais = new ByteArrayInputStream(
1022:                            messageAsBytes);
1023:                    stmt.setBinaryStream(column, bais, messageAsBytes.length);
1024:                } else if (blobType == BLOB_BLOB) {
1025:
1026:                    throw new RuntimeException(
1027:                            "BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1028:                    /** TODO:
1029:                    ByteArrayOutputStream baos= new ByteArrayOutputStream();
1030:                    ObjectOutputStream oos= new ObjectOutputStream(baos);
1031:                    oos.writeObject(message);
1032:                    byte[] messageAsBytes= baos.toByteArray();
1033:                    ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1034:                    stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1035:                     */
1036:                }
1037:            }
1038:
1039:            public void setBlob(PreparedStatement stmt, int column, Xid xid)
1040:                    throws IOException, SQLException {
1041:                if (blobType == OBJECT_BLOB) {
1042:                    stmt.setObject(column, xid);
1043:                } else if (blobType == BYTES_BLOB) {
1044:                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
1045:                    ObjectOutputStream oos = new ObjectOutputStream(baos);
1046:                    oos.writeObject(xid);
1047:                    oos.flush();
1048:                    byte[] messageAsBytes = baos.toByteArray();
1049:                    stmt.setBytes(column, messageAsBytes);
1050:                } else if (blobType == BINARYSTREAM_BLOB) {
1051:                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
1052:                    ObjectOutputStream oos = new ObjectOutputStream(baos);
1053:                    oos.writeObject(xid);
1054:                    oos.flush();
1055:                    byte[] messageAsBytes = baos.toByteArray();
1056:                    ByteArrayInputStream bais = new ByteArrayInputStream(
1057:                            messageAsBytes);
1058:                    stmt.setBinaryStream(column, bais, messageAsBytes.length);
1059:                } else if (blobType == BLOB_BLOB) {
1060:
1061:                    throw new RuntimeException(
1062:                            "BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1063:                    /** TODO:
1064:                    ByteArrayOutputStream baos= new ByteArrayOutputStream();
1065:                    ObjectOutputStream oos= new ObjectOutputStream(baos);
1066:                    oos.writeObject(xid);
1067:                    byte[] messageAsBytes= baos.toByteArray();
1068:                    ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1069:                    stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1070:                     */
1071:                }
1072:            }
1073:
1074:            /////////////////////////////////////////////////////////////////////////////////
1075:            //
1076:            // Updating a message
1077:            //
1078:            /////////////////////////////////////////////////////////////////////////////////
1079:            public void update(MessageReference messageRef, Tx txId)
1080:                    throws javax.jms.JMSException {
1081:                boolean trace = log.isTraceEnabled();
1082:                if (trace)
1083:                    log.trace("Updating message " + messageRef
1084:                            + " transaction=" + txId);
1085:
1086:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
1087:                tms.startTX();
1088:                Connection c = null;
1089:                PreparedStatement stmt = null;
1090:                boolean threadWasInterrupted = Thread.interrupted();
1091:                try {
1092:
1093:                    c = this .getConnection();
1094:                    if (txId == null) {
1095:
1096:                        stmt = c.prepareStatement(UPDATE_MESSAGE);
1097:                        setBlob(stmt, 1, messageRef.getMessage());
1098:                        stmt.setLong(2, messageRef.messageId);
1099:                        stmt.setString(3, messageRef.getPersistentKey());
1100:                        int rc = stmt.executeUpdate();
1101:                        if (rc != 1)
1102:                            throw new SpyJMSException(
1103:                                    "Could not update the message in the database: update affected "
1104:                                            + rc + " rows");
1105:                    } else {
1106:                        throw new SpyJMSException(
1107:                                "NYI: Updating a message in a transaction is not currently used");
1108:                    }
1109:                    if (trace)
1110:                        log.trace("Updated message " + messageRef
1111:                                + " transaction=" + txId);
1112:
1113:                } catch (IOException e) {
1114:                    tms.setRollbackOnly();
1115:                    throw new SpyJMSException("Could not update message: "
1116:                            + messageRef, e);
1117:                } catch (SQLException e) {
1118:                    tms.setRollbackOnly();
1119:                    throw new SpyJMSException("Could not update message: "
1120:                            + messageRef, e);
1121:                } finally {
1122:                    try {
1123:                        if (stmt != null)
1124:                            stmt.close();
1125:                    } catch (Throwable ignore) {
1126:                    }
1127:                    try {
1128:                        if (c != null)
1129:                            c.close();
1130:                    } catch (Throwable ignore) {
1131:                    }
1132:                    tms.endTX();
1133:
1134:                    // Restore the interrupted state of the thread
1135:                    if (threadWasInterrupted)
1136:                        Thread.currentThread().interrupt();
1137:                }
1138:
1139:            }
1140:
1141:            /////////////////////////////////////////////////////////////////////////////////
1142:            //
1143:            // Removing a message
1144:            //
1145:            /////////////////////////////////////////////////////////////////////////////////
1146:            public void remove(MessageReference messageRef, Tx txId)
1147:                    throws javax.jms.JMSException {
1148:                boolean trace = log.isTraceEnabled();
1149:                if (trace)
1150:                    log.trace("Removing message " + messageRef
1151:                            + " transaction=" + txId);
1152:
1153:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
1154:                tms.startTX();
1155:                Connection c = null;
1156:                PreparedStatement stmt = null;
1157:                boolean threadWasInterrupted = Thread.interrupted();
1158:                try {
1159:                    c = this .getConnection();
1160:
1161:                    // Lazily write the peristent transaction
1162:                    insertPersistentTx(tms, c, txId);
1163:
1164:                    // Synchronize on the message to avoid a race with the softener
1165:                    synchronized (messageRef) {
1166:                        if (txId == null) {
1167:                            stmt = c.prepareStatement(DELETE_MESSAGE);
1168:                            stmt.setLong(1, messageRef.messageId);
1169:                            stmt.setString(2, messageRef.getPersistentKey());
1170:
1171:                            // Adrian Brock:
1172:                            // Remove the message from the cache, but don't 
1173:                            // return it to the pool just yet. The queue still holds
1174:                            // a reference to the message and will return it
1175:                            // to the pool once it gets enough time slice.
1176:                            // The alternative is to remove the validation
1177:                            // for double removal from the cache, 
1178:                            // which I don't want to do because it is useful
1179:                            // for spotting errors
1180:                            messageRef.setStored(MessageReference.NOT_STORED);
1181:                            messageRef.removeDelayed();
1182:                        } else {
1183:                            stmt = c.prepareStatement(MARK_MESSAGE);
1184:                            stmt.setLong(1, txId.longValue());
1185:                            stmt.setString(2, "D");
1186:                            stmt.setLong(3, messageRef.messageId);
1187:                            stmt.setString(4, messageRef.getPersistentKey());
1188:                        }
1189:
1190:                        int tries = 0;
1191:                        while (true) {
1192:                            try {
1193:                                int rc = stmt.executeUpdate();
1194:
1195:                                if (tries > 0) {
1196:                                    if (rc != 1)
1197:                                        throw new SpyJMSException(
1198:                                                "Could not mark the message as deleted in the database: update affected "
1199:                                                        + rc + " rows."
1200:                                                        + CONCURRENCY_WARNING);
1201:
1202:                                    log.warn("Remove operation worked after "
1203:                                            + tries + " retries");
1204:                                }
1205:                                break;
1206:                            } catch (SQLException e) {
1207:                                log.warn(
1208:                                        "SQLException caught - assuming deadlock detected, try:"
1209:                                                + (tries + 1), e);
1210:                                tries++;
1211:                                if (tries >= statementRetries) {
1212:                                    log.error("Retried " + tries
1213:                                            + " times, now giving up");
1214:                                    throw new IllegalStateException(
1215:                                            "Could not remove message after "
1216:                                                    + tries + "attempts");
1217:                                }
1218:                                log.warn("Trying again after a pause");
1219:                                //Now we wait for a random amount of time to minimise risk of deadlock
1220:                                Thread.sleep((long) (Math.random() * 500));
1221:                            }
1222:                        }
1223:
1224:                        if (trace)
1225:                            log.trace("Removed message " + messageRef
1226:                                    + " transaction=" + txId);
1227:                    }
1228:                } catch (Exception e) {
1229:                    tms.setRollbackOnly();
1230:                    throw new SpyJMSException("Could not remove message: "
1231:                            + messageRef, e);
1232:                } finally {
1233:                    try {
1234:                        if (stmt != null)
1235:                            stmt.close();
1236:                    } catch (Throwable ignore) {
1237:                    }
1238:                    try {
1239:                        if (c != null)
1240:                            c.close();
1241:                    } catch (Throwable ignore) {
1242:                    }
1243:                    tms.endTX();
1244:
1245:                    // Restore the interrupted state of the thread
1246:                    if (threadWasInterrupted)
1247:                        Thread.currentThread().interrupt();
1248:                }
1249:
1250:            }
1251:
1252:            /////////////////////////////////////////////////////////////////////////////////
1253:            //
1254:            // Misc. PM functions
1255:            //
1256:            /////////////////////////////////////////////////////////////////////////////////
1257:
1258:            public TxManager getTxManager() {
1259:                return txManager;
1260:            }
1261:
1262:            public void closeQueue(JMSDestination jmsDest, SpyDestination dest)
1263:                    throws JMSException {
1264:                // Nothing to clean up, all the state is in the db.
1265:            }
1266:
1267:            public SpyMessage loadFromStorage(MessageReference messageRef)
1268:                    throws JMSException {
1269:                if (log.isTraceEnabled())
1270:                    log.trace("Loading message from storage " + messageRef);
1271:
1272:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
1273:                tms.startTX();
1274:                Connection c = null;
1275:                PreparedStatement stmt = null;
1276:                ResultSet rs = null;
1277:                boolean threadWasInterrupted = Thread.interrupted();
1278:                try {
1279:
1280:                    c = this .getConnection();
1281:                    stmt = c.prepareStatement(SELECT_MESSAGE);
1282:                    stmt.setLong(1, messageRef.messageId);
1283:                    stmt.setString(2, messageRef.getPersistentKey());
1284:
1285:                    rs = stmt.executeQuery();
1286:                    if (rs.next())
1287:                        return extractMessage(rs);
1288:                    else
1289:                        throw new SpyJMSException(
1290:                                "Could not load message from storage: "
1291:                                        + messageRef + " "
1292:                                        + CONCURRENCY_WARNING);
1293:
1294:                } catch (Exception e) {
1295:                    tms.setRollbackOnly();
1296:                    SpyJMSException.rethrowAsJMSException(
1297:                            "Could not load message : " + messageRef, e);
1298:                    throw new UnreachableStatementException();
1299:                } finally {
1300:                    try {
1301:                        if (rs != null)
1302:                            rs.close();
1303:                    } catch (Throwable ignore) {
1304:                    }
1305:                    try {
1306:                        if (stmt != null)
1307:                            stmt.close();
1308:                    } catch (Throwable ignore) {
1309:                    }
1310:                    try {
1311:                        if (c != null)
1312:                            c.close();
1313:                    } catch (Throwable ignore) {
1314:                    }
1315:                    tms.endTX();
1316:
1317:                    // Restore the interrupted state of the thread
1318:                    if (threadWasInterrupted)
1319:                        Thread.currentThread().interrupt();
1320:                }
1321:            }
1322:
1323:            /////////////////////////////////////////////////////////////////////////////////
1324:            //
1325:            // CacheStore Functions
1326:            //
1327:            /////////////////////////////////////////////////////////////////////////////////   
1328:            public void removeFromStorage(MessageReference messageRef)
1329:                    throws JMSException {
1330:                // We don't remove persistent messages sent to persistent queues
1331:                if (messageRef.isPersistent())
1332:                    return;
1333:
1334:                boolean trace = log.isTraceEnabled();
1335:                if (trace)
1336:                    log.trace("Removing message from storage " + messageRef);
1337:
1338:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
1339:                tms.startTX();
1340:                Connection c = null;
1341:                PreparedStatement stmt = null;
1342:                boolean threadWasInterrupted = Thread.interrupted();
1343:                try {
1344:                    c = this .getConnection();
1345:                    stmt = c.prepareStatement(DELETE_MESSAGE);
1346:                    stmt.setLong(1, messageRef.messageId);
1347:                    stmt.setString(2, messageRef.getPersistentKey());
1348:                    stmt.executeUpdate();
1349:                    messageRef.setStored(MessageReference.NOT_STORED);
1350:
1351:                    if (trace)
1352:                        log.trace("Removed message from storage " + messageRef);
1353:                } catch (SQLException e) {
1354:                    tms.setRollbackOnly();
1355:                    throw new SpyJMSException("Could not remove message: "
1356:                            + messageRef, e);
1357:                } finally {
1358:                    try {
1359:                        if (stmt != null)
1360:                            stmt.close();
1361:                    } catch (Throwable ignore) {
1362:                    }
1363:                    try {
1364:                        if (c != null)
1365:                            c.close();
1366:                    } catch (Throwable ignore) {
1367:                    }
1368:                    tms.endTX();
1369:
1370:                    // Restore the interrupted state of the thread
1371:                    if (threadWasInterrupted)
1372:                        Thread.currentThread().interrupt();
1373:                }
1374:            }
1375:
1376:            public void saveToStorage(MessageReference messageRef,
1377:                    SpyMessage message) throws JMSException {
1378:                // Ignore save operations for persistent messages sent to persistent queues
1379:                // The queues handle the persistence
1380:                if (messageRef.isPersistent())
1381:                    return;
1382:
1383:                boolean trace = log.isTraceEnabled();
1384:                if (trace)
1385:                    log.trace("Saving message to storage " + messageRef);
1386:
1387:                TransactionManagerStrategy tms = new TransactionManagerStrategy();
1388:                tms.startTX();
1389:                Connection c = null;
1390:                boolean threadWasInterrupted = Thread.interrupted();
1391:                try {
1392:
1393:                    c = this .getConnection();
1394:                    add(c, messageRef.getPersistentKey(), message, null, "T");
1395:                    messageRef.setStored(MessageReference.STORED);
1396:
1397:                    if (trace)
1398:                        log.trace("Saved message to storage " + messageRef);
1399:                } catch (IOException e) {
1400:                    tms.setRollbackOnly();
1401:                    throw new SpyJMSException("Could not store message: "
1402:                            + messageRef, e);
1403:                } catch (SQLException e) {
1404:                    tms.setRollbackOnly();
1405:                    throw new SpyJMSException("Could not store message: "
1406:                            + messageRef, e);
1407:                } finally {
1408:                    try {
1409:                        if (c != null)
1410:                            c.close();
1411:                    } catch (Throwable ignore) {
1412:                    }
1413:                    tms.endTX();
1414:
1415:                    // Restore the interrupted state of the thread
1416:                    if (threadWasInterrupted)
1417:                        Thread.currentThread().interrupt();
1418:                }
1419:            }
1420:
1421:            /**
1422:             * Gets a connection from the datasource, retrying as needed.  This was
1423:             * implemented because in some minimal configurations (i.e. little logging
1424:             * and few services) the database wasn't ready when we tried to get a
1425:             * connection.  We, therefore, implement a retry loop wich is controled
1426:             * by the ConnectionRetryAttempts attribute.  Submitted by terry@amicas.com
1427:             *
1428:             * @return the connection
1429:             * @exception SQLException if an error occurs.
1430:             */
1431:            protected Connection getConnection() throws SQLException {
1432:                int attempts = this .connectionRetryAttempts;
1433:                int attemptCount = 0;
1434:                SQLException sqlException = null;
1435:                while (attempts-- > 0) {
1436:                    if (++attemptCount > 1) {
1437:                        log.debug("Retrying connection: attempt # "
1438:                                + attemptCount);
1439:                    }
1440:                    try {
1441:                        sqlException = null;
1442:                        return datasource.getConnection();
1443:                    } catch (SQLException exception) {
1444:                        log.debug("Connection attempt # " + attemptCount
1445:                                + " failed with SQLException", exception);
1446:                        sqlException = exception;
1447:                    } finally {
1448:                        if (sqlException == null && attemptCount > 1) {
1449:                            log.debug("Connection succeeded on attempt # "
1450:                                    + attemptCount);
1451:                        }
1452:                    }
1453:
1454:                    if (attempts > 0) {
1455:                        try {
1456:                            Thread.sleep(1500);
1457:                        } catch (InterruptedException interruptedException) {
1458:                            break;
1459:                        }
1460:                    }
1461:                }
1462:                if (sqlException != null) {
1463:                    throw sqlException;
1464:                }
1465:                throw new SQLException("connection attempt interrupted");
1466:            }
1467:
1468:            /////////////////////////////////////////////////////////////////////////////////
1469:            //
1470:            // JMX Interface 
1471:            //
1472:            /////////////////////////////////////////////////////////////////////////////////
1473:
1474:            /** The object name of the DataSource */
1475:            protected ObjectName connectionManagerName;
1476:
1477:            /** The SQL properties */
1478:            protected Properties sqlProperties = new Properties();
1479:
1480:            public void startService() throws Exception {
1481:                UPDATE_MARKED_MESSAGES = sqlProperties.getProperty(
1482:                        "UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
1483:                UPDATE_MARKED_MESSAGES_XARECOVERY = sqlProperties.getProperty(
1484:                        "UPDATE_MARKED_MESSAGES_XARECOVERY",
1485:                        UPDATE_MARKED_MESSAGES_XARECOVERY);
1486:                UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty(
1487:                        "UPDATE_MARKED_MESSAGES_WITH_TX",
1488:                        UPDATE_MARKED_MESSAGES_WITH_TX);
1489:                DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty(
1490:                        "DELETE_MARKED_MESSAGES_WITH_TX",
1491:                        DELETE_MARKED_MESSAGES_WITH_TX);
1492:                DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY = sqlProperties
1493:                        .getProperty(
1494:                                "DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY",
1495:                                DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY);
1496:                DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
1497:                DELETE_MARKED_MESSAGES = sqlProperties.getProperty(
1498:                        "DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
1499:                DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty(
1500:                        "DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
1501:                INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
1502:                INSERT_TX_XARECOVERY = sqlProperties.getProperty(
1503:                        "INSERT_TX_XARECOVERY", INSERT_TX_XARECOVERY);
1504:                DELETE_ALL_TX = sqlProperties.getProperty("DELETE_ALL_TX",
1505:                        DELETE_ALL_TX);
1506:                DELETE_ALL_TX_XARECOVERY = sqlProperties.getProperty(
1507:                        "DELETE_ALL_TX_XARECOVERY", DELETE_ALL_TX_XARECOVERY);
1508:                SELECT_ALL_TX_XARECOVERY = sqlProperties.getProperty(
1509:                        "SELECT_ALL_TX_XARECOVERY", SELECT_ALL_TX_XARECOVERY);
1510:                SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX",
1511:                        SELECT_MAX_TX);
1512:                SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty(
1513:                        "SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
1514:                SELECT_MESSAGES_IN_DEST_XARECOVERY = sqlProperties.getProperty(
1515:                        "SELECT_MESSAGES_IN_DEST_XARECOVERY",
1516:                        SELECT_MESSAGES_IN_DEST_XARECOVERY);
1517:                SELECT_MESSAGE_KEYS_IN_DEST = sqlProperties.getProperty(
1518:                        "SELECT_MESSAGE_KEYS_IN_DEST",
1519:                        SELECT_MESSAGE_KEYS_IN_DEST);
1520:                SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE",
1521:                        SELECT_MESSAGE);
1522:                SELECT_MESSAGE_XARECOVERY = sqlProperties.getProperty(
1523:                        "SELECT_MESSAGE_XARECOVERY", SELECT_MESSAGE_XARECOVERY);
1524:                INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE",
1525:                        INSERT_MESSAGE);
1526:                MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE",
1527:                        MARK_MESSAGE);
1528:                DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE",
1529:                        DELETE_MESSAGE);
1530:                UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE",
1531:                        UPDATE_MESSAGE);
1532:                CREATE_MESSAGE_TABLE = sqlProperties.getProperty(
1533:                        "CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
1534:                CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty(
1535:                        "CREATE_IDX_MESSAGE_TXOP_TXID",
1536:                        CREATE_IDX_MESSAGE_TXOP_TXID);
1537:                CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty(
1538:                        "CREATE_IDX_MESSAGE_DESTINATION",
1539:                        CREATE_IDX_MESSAGE_DESTINATION);
1540:                CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE",
1541:                        CREATE_TX_TABLE);
1542:                CREATE_TX_TABLE_XARECOVERY = sqlProperties.getProperty(
1543:                        "CREATE_TX_TABLE_XARECOVERY",
1544:                        CREATE_TX_TABLE_XARECOVERY);
1545:                createTables = sqlProperties.getProperty(
1546:                        "CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase(
1547:                        "true");
1548:                String s = sqlProperties
1549:                        .getProperty("BLOB_TYPE", "OBJECT_BLOB");
1550:
1551:                if (s.equals("OBJECT_BLOB")) {
1552:                    blobType = OBJECT_BLOB;
1553:                } else if (s.equals("BYTES_BLOB")) {
1554:                    blobType = BYTES_BLOB;
1555:                } else if (s.equals("BINARYSTREAM_BLOB")) {
1556:                    blobType = BINARYSTREAM_BLOB;
1557:                } else if (s.equals("BLOB_BLOB")) {
1558:                    blobType = BLOB_BLOB;
1559:                }
1560:
1561:                // initialize tm and datasource
1562:                initializeFields();
1563:
1564:                log.debug("Creating Schema");
1565:                try {
1566:                    createSchema();
1567:                } catch (Exception e) {
1568:                    log.warn("Error creating schema", e);
1569:                }
1570:
1571:                log.debug("Resolving uncommited TXS");
1572:                Throwable error = null;
1573:                for (int i = 0; i <= recoveryRetries; ++i) {
1574:                    try {
1575:                        resolveAllUncommitedTXs();
1576:
1577:                        // done
1578:                        break;
1579:                    } catch (Throwable t) {
1580:                        if (i < recoveryRetries)
1581:                            log.warn("Error resolving transactions retries="
1582:                                    + i + " of " + recoveryRetries, t);
1583:                        else
1584:                            error = t;
1585:                    }
1586:                }
1587:
1588:                if (error != null)
1589:                    SpyJMSException.rethrowAsJMSException(
1590:                            "Unable to resolve transactions retries="
1591:                                    + recoveryRetries, error);
1592:            }
1593:
1594:            protected void initializeFields() throws MBeanException,
1595:                    AttributeNotFoundException, InstanceNotFoundException,
1596:                    ReflectionException, NamingException {
1597:                //Find the ConnectionFactoryLoader MBean so we can find the datasource
1598:                String dsName = (String) getServer().getAttribute(
1599:                        connectionManagerName, "BindName");
1600:                //Get an InitialContext
1601:
1602:                InitialContext ctx = new InitialContext();
1603:                datasource = (DataSource) ctx.lookup(dsName);
1604:
1605:                //Get the Transaction Manager so we can control the jdbc tx
1606:                tm = (TransactionManager) ctx
1607:                        .lookup(TransactionManagerService.JNDI_NAME);
1608:            }
1609:
1610:            public Object getInstance() {
1611:                return this ;
1612:            }
1613:
1614:            public ObjectName getMessageCache() {
1615:                throw new UnsupportedOperationException(
1616:                        "This is now set on the destination manager");
1617:            }
1618:
1619:            public void setMessageCache(ObjectName messageCache) {
1620:                throw new UnsupportedOperationException(
1621:                        "This is now set on the destination manager");
1622:            }
1623:
1624:            public ObjectName getConnectionManager() {
1625:                return connectionManagerName;
1626:            }
1627:
1628:            public void setConnectionManager(ObjectName connectionManagerName) {
1629:                this .connectionManagerName = connectionManagerName;
1630:            }
1631:
1632:            public MessageCache getMessageCacheInstance() {
1633:                throw new UnsupportedOperationException(
1634:                        "This is now set on the destination manager");
1635:            }
1636:
1637:            public String getSqlProperties() {
1638:                try {
1639:                    ByteArrayOutputStream boa = new ByteArrayOutputStream();
1640:                    sqlProperties.store(boa, "");
1641:                    return new String(boa.toByteArray());
1642:                } catch (IOException shouldnothappen) {
1643:                    return "";
1644:                }
1645:            }
1646:
1647:            public void setSqlProperties(String value) {
1648:                try {
1649:                    ByteArrayInputStream is = new ByteArrayInputStream(value
1650:                            .getBytes());
1651:                    sqlProperties = new Properties();
1652:                    sqlProperties.load(is);
1653:                } catch (IOException shouldnothappen) {
1654:                }
1655:            }
1656:
1657:            public void setConnectionRetryAttempts(int value) {
1658:                this .connectionRetryAttempts = value;
1659:            }
1660:
1661:            public int getConnectionRetryAttempts() {
1662:                return this .connectionRetryAttempts;
1663:            }
1664:
1665:            public int getRecoveryTimeout() {
1666:                return recoveryTimeout;
1667:            }
1668:
1669:            public void setRecoveryTimeout(int timeout) {
1670:                this .recoveryTimeout = timeout;
1671:            }
1672:
1673:            public int getRecoveryRetries() {
1674:                return recoveryRetries;
1675:            }
1676:
1677:            public void setRecoveryRetries(int retries) {
1678:                this .recoveryRetries = retries;
1679:            }
1680:
1681:            public int getRecoverMessagesChunk() {
1682:                return recoverMessagesChunk;
1683:            }
1684:
1685:            public void setRecoverMessagesChunk(int recoverMessagesChunk) {
1686:                if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1) {
1687:                    log
1688:                            .warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1");
1689:                    recoverMessagesChunk = 1;
1690:                }
1691:                this .recoverMessagesChunk = recoverMessagesChunk;
1692:            }
1693:
1694:            public boolean isXARecovery() {
1695:                return xaRecovery;
1696:            }
1697:
1698:            public void setXARecovery(boolean xaRecovery) {
1699:                this .xaRecovery = xaRecovery;
1700:            }
1701:
1702:            public int getStatementRetries() {
1703:                return statementRetries;
1704:            }
1705:
1706:            public void setStatementRetries(int statementRetries) {
1707:                if (statementRetries < 0)
1708:                    statementRetries = 0;
1709:                this.statementRetries = statementRetries;
1710:            }
1711:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.