001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.sm.jdbc;
023:
024: import java.io.ByteArrayInputStream;
025: import java.io.ByteArrayOutputStream;
026: import java.io.IOException;
027: import java.sql.Connection;
028: import java.sql.PreparedStatement;
029: import java.sql.ResultSet;
030: import java.sql.SQLException;
031: import java.sql.Statement;
032: import java.util.ArrayList;
033: import java.util.Collection;
034: import java.util.HashSet;
035: import java.util.Iterator;
036: import java.util.List;
037: import java.util.Map;
038: import java.util.Properties;
039:
040: import javax.jms.InvalidClientIDException;
041: import javax.jms.JMSException;
042: import javax.jms.JMSSecurityException;
043: import javax.management.ObjectName;
044: import javax.naming.InitialContext;
045: import javax.sql.DataSource;
046: import javax.transaction.Status;
047: import javax.transaction.Transaction;
048: import javax.transaction.TransactionManager;
049:
050: import org.jboss.logging.Logger;
051: import org.jboss.mq.DurableSubscriptionID;
052: import org.jboss.mq.SpyJMSException;
053: import org.jboss.mq.SpyTopic;
054: import org.jboss.mq.sm.AbstractStateManager;
055: import org.jboss.mq.sm.StateManager;
056: import org.jboss.tm.TransactionManagerService;
057:
058: /**
059: * A state manager that stores state in the database.
060: *
061: * @jmx:mbean extends="org.jboss.mq.sm.AbstractStateManagerMBean"
062: * @todo add support for jmx operations to maintain the database
063: * @todo create indices
064: *
065: * @author Adrian Brock (Adrian@jboss.org)
066: * @author Ivelin Ivanov (ivelin@jboss.org)
067: * @version $Revision: 61855 $
068: */
069: public class JDBCStateManager extends AbstractStateManager implements
070: JDBCStateManagerMBean {
071: static final Logger log = Logger.getLogger(JDBCStateManager.class);
072:
073: /** The connection manager */
074: private ObjectName connectionManagerName;
075:
076: /** The data source */
077: protected DataSource dataSource;
078:
079: /** The connection retries */
080: protected int connectionRetryAttempts = 5;
081:
082: /** Whether there is a security manager */
083: private boolean hasSecurityManager = true;
084:
085: /** The transaction manager */
086: protected TransactionManager tm;
087:
088: /** The sql properties */
089: protected Properties sqlProperties = new Properties();
090:
091: /** Whether to create tables */
092: protected boolean createTables = true;
093:
094: /** Create the user table */
095: protected String CREATE_USER_TABLE = "CREATE TABLE JMS_USERS (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128),"
096: + " PRIMARY KEY(USERID))";
097:
098: /** Create the role table */
099: protected String CREATE_ROLE_TABLE = "CREATE TABLE JMS_ROLES (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL,"
100: + " PRIMARY KEY(USERID, ROLEID))";
101:
102: protected String CREATE_SUBSCRIPTION_TABLE = "CREATE TABLE JMS_SUBSCRIPTIONS (CLIENTID VARCHAR(128) NOT NULL, NAME VARCHAR(128) NOT NULL,"
103: + " TOPIC VARCHAR(255) NOT NULL, SELECTOR VARCHAR(255),"
104: + " PRIMARY KEY(CLIENTID, NAME))";
105:
106: /** Get a subscription */
107: protected String GET_SUBSCRIPTION = "SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND NAME=?";
108:
109: /** Get subscriptions for a topic */
110: protected String GET_SUBSCRIPTIONS_FOR_TOPIC = "SELECT CLIENTID, NAME, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE TOPIC=?";
111:
112: /** Lock a subscription */
113: protected String LOCK_SUBSCRIPTION = "SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND NAME=?";
114:
115: /** Insert a subscription */
116: protected String INSERT_SUBSCRIPTION = "INSERT INTO JMS_SUBSCRIPTIONS (CLIENTID, NAME, TOPIC, SELECTOR) VALUES(?,?,?,?)";
117:
118: /** Update a subscription */
119: protected String UPDATE_SUBSCRIPTION = "UPDATE JMS_SUBSCRIPTIONS SET TOPIC=?, SELECTOR=? WHERE CLIENTID=? AND NAME=?";
120:
121: /** Remove a subscription */
122: protected String REMOVE_SUBSCRIPTION = "DELETE FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND NAME=?";
123:
124: /** Get a user with the given client id */
125: protected String GET_USER_BY_CLIENTID = "SELECT USERID, PASSWD, CLIENTID FROM JMS_USERS WHERE CLIENTID=?";
126:
127: /** Get a user with the given user id */
128: protected String GET_USER = "SELECT PASSWD, CLIENTID FROM JMS_USERS WHERE USERID=?";
129:
130: /** Populate tables with initial data */
131: protected List POPULATE_TABLES = new ArrayList();
132:
133: public ObjectName getConnectionManager() {
134: return connectionManagerName;
135: }
136:
137: public void setConnectionManager(ObjectName connectionManagerName) {
138: this .connectionManagerName = connectionManagerName;
139: }
140:
141: public boolean hasSecurityManager() {
142: return hasSecurityManager;
143: }
144:
145: public void setHasSecurityManager(boolean hasSecurityManager) {
146: this .hasSecurityManager = hasSecurityManager;
147: }
148:
149: public String getSqlProperties() {
150: try {
151: ByteArrayOutputStream boa = new ByteArrayOutputStream();
152: sqlProperties.store(boa, "");
153: return new String(boa.toByteArray());
154: } catch (IOException shouldnothappen) {
155: return "";
156: }
157: }
158:
159: public void setSqlProperties(String value) {
160: try {
161:
162: ByteArrayInputStream is = new ByteArrayInputStream(value
163: .getBytes());
164: sqlProperties = new Properties();
165: sqlProperties.load(is);
166:
167: } catch (IOException shouldnothappen) {
168: }
169: }
170:
171: public void setConnectionRetryAttempts(int value) {
172: this .connectionRetryAttempts = value;
173: }
174:
175: public int getConnectionRetryAttempts() {
176: return this .connectionRetryAttempts;
177: }
178:
179: protected DurableSubscription getDurableSubscription(
180: DurableSubscriptionID sub) throws JMSException {
181: JDBCSession session = new JDBCSession();
182: try {
183: PreparedStatement statement = session
184: .prepareStatement(GET_SUBSCRIPTION);
185: statement.setString(1, sub.getClientID());
186: statement.setString(2, sub.getSubscriptionName());
187: ResultSet rs = statement.executeQuery();
188: session.addResultSet(rs);
189: if (rs.next() == false)
190: return null;
191:
192: return new DurableSubscription(sub.getClientID(), sub
193: .getSubscriptionName(), rs.getString(1), rs
194: .getString(2));
195: } catch (SQLException e) {
196: session.setRollbackOnly();
197: throw new SpyJMSException(
198: "Error getting durable subscription " + sub, e);
199: } finally {
200: session.close();
201: }
202: }
203:
204: protected void saveDurableSubscription(DurableSubscription ds)
205: throws JMSException {
206: JDBCSession session = new JDBCSession();
207: try {
208: PreparedStatement statement = session
209: .prepareStatement(LOCK_SUBSCRIPTION);
210: statement.setString(1, ds.getClientID());
211: statement.setString(2, ds.getName());
212: ResultSet rs = statement.executeQuery();
213: session.addResultSet(rs);
214: if (rs.next() == false) {
215: statement = session
216: .prepareStatement(INSERT_SUBSCRIPTION);
217: statement.setString(1, ds.getClientID());
218: statement.setString(2, ds.getName());
219: statement.setString(3, ds.getTopic());
220: statement.setString(4, ds.getSelector());
221: } else {
222: statement = session
223: .prepareStatement(UPDATE_SUBSCRIPTION);
224: statement.setString(1, ds.getTopic());
225: statement.setString(2, ds.getSelector());
226: statement.setString(3, ds.getClientID());
227: statement.setString(4, ds.getName());
228: }
229: if (statement.executeUpdate() != 1) {
230: session.setRollbackOnly();
231: throw new SpyJMSException("Insert subscription failed "
232: + ds);
233: }
234: } catch (SQLException e) {
235: session.setRollbackOnly();
236: throw new SpyJMSException(
237: "Error saving durable subscription " + ds, e);
238: } finally {
239: session.close();
240: }
241: }
242:
243: protected void removeDurableSubscription(DurableSubscription ds)
244: throws JMSException {
245: JDBCSession session = new JDBCSession();
246: try {
247: PreparedStatement statement = session
248: .prepareStatement(REMOVE_SUBSCRIPTION);
249: statement.setString(1, ds.getClientID());
250: statement.setString(2, ds.getName());
251: if (statement.executeUpdate() != 1)
252: throw new JMSException(
253: "Durable subscription does not exist " + ds);
254: } catch (SQLException e) {
255: session.setRollbackOnly();
256: throw new SpyJMSException(
257: "Error removing durable subscription " + ds, e);
258: } finally {
259: session.close();
260: }
261: }
262:
263: public Collection getDurableSubscriptionIdsForTopic(SpyTopic topic)
264: throws JMSException {
265: ArrayList result = new ArrayList();
266:
267: JDBCSession session = new JDBCSession();
268: try {
269: PreparedStatement statement = session
270: .prepareStatement(GET_SUBSCRIPTIONS_FOR_TOPIC);
271: statement.setString(1, topic.getName());
272: ResultSet rs = statement.executeQuery();
273: session.addResultSet(rs);
274: while (rs.next()) {
275: result.add(new DurableSubscriptionID(rs.getString(1),
276: rs.getString(2), rs.getString(3)));
277: }
278:
279: return result;
280: } catch (SQLException e) {
281: session.setRollbackOnly();
282: throw new SpyJMSException(
283: "Error getting durable subscriptions for topic "
284: + topic, e);
285: } finally {
286: session.close();
287: }
288: }
289:
290: protected void checkLoggedOnClientId(String clientID)
291: throws JMSException {
292: JDBCSession session = new JDBCSession();
293: try {
294: PreparedStatement statement = session
295: .prepareStatement(GET_USER_BY_CLIENTID);
296: statement.setString(1, clientID);
297: ResultSet rs = statement.executeQuery();
298: session.addResultSet(rs);
299: if (rs.next())
300: throw new InvalidClientIDException(
301: "This client id is password protected "
302: + clientID);
303: } catch (SQLException e) {
304: session.setRollbackOnly();
305: throw new SpyJMSException(
306: "Error checking logged on client id " + clientID, e);
307: } finally {
308: session.close();
309: }
310: }
311:
312: protected String getPreconfClientId(String logon, String passwd)
313: throws JMSException {
314: JDBCSession session = new JDBCSession();
315: try {
316: PreparedStatement statement = session
317: .prepareStatement(GET_USER);
318: statement.setString(1, logon);
319: ResultSet rs = statement.executeQuery();
320: session.addResultSet(rs);
321: if (rs.next() == false) {
322: if (hasSecurityManager)
323: return null;
324: else
325: throw new JMSSecurityException(
326: "This user does not exist " + logon);
327: }
328:
329: if (hasSecurityManager == false
330: && passwd.equals(rs.getString(1)) == false)
331: throw new JMSSecurityException("Bad password for user "
332: + logon);
333:
334: return rs.getString(2);
335: } catch (SQLException e) {
336: session.setRollbackOnly();
337: throw new SpyJMSException(
338: "Error retrieving preconfigured user " + logon, e);
339: } finally {
340: session.close();
341: }
342: }
343:
344: public StateManager getInstance() {
345: return this ;
346: }
347:
348: protected void startService() throws Exception {
349: if (connectionManagerName == null)
350: throw new IllegalStateException(
351: "No connection manager configured");
352:
353: //Find the ConnectionFactoryLoader MBean so we can find the datasource
354: String dsName = (String) getServer().getAttribute(
355: connectionManagerName, "BindName");
356:
357: InitialContext ctx = new InitialContext();
358: try {
359: dataSource = (DataSource) ctx.lookup(dsName);
360: tm = (TransactionManager) ctx
361: .lookup(TransactionManagerService.JNDI_NAME);
362: } finally {
363: ctx.close();
364: }
365:
366: try {
367: initDB();
368: } catch (Exception e) {
369: log.warn("Error initialising state manager db", e);
370: }
371: }
372:
373: protected void initDB() throws Exception {
374: CREATE_USER_TABLE = sqlProperties.getProperty(
375: "CREATE_USER_TABLE", CREATE_USER_TABLE);
376: CREATE_ROLE_TABLE = sqlProperties.getProperty(
377: "CREATE_ROLE_TABLE", CREATE_ROLE_TABLE);
378: CREATE_SUBSCRIPTION_TABLE = sqlProperties.getProperty(
379: "CREATE_SUBSCRIPTION_TABLE", CREATE_SUBSCRIPTION_TABLE);
380: GET_SUBSCRIPTION = sqlProperties.getProperty(
381: "GET_SUBSCRIPTION", GET_SUBSCRIPTION);
382: GET_SUBSCRIPTIONS_FOR_TOPIC = sqlProperties.getProperty(
383: "GET_SUBSCRIPTIONS_FOR_TOPIC",
384: GET_SUBSCRIPTIONS_FOR_TOPIC);
385: LOCK_SUBSCRIPTION = sqlProperties.getProperty(
386: "LOCK_SUBSCRIPTION", LOCK_SUBSCRIPTION);
387: INSERT_SUBSCRIPTION = sqlProperties.getProperty(
388: "INSERT_SUBSCRIPTION", INSERT_SUBSCRIPTION);
389: UPDATE_SUBSCRIPTION = sqlProperties.getProperty(
390: "UPDATE_SUBSCRIPTION", UPDATE_SUBSCRIPTION);
391: REMOVE_SUBSCRIPTION = sqlProperties.getProperty(
392: "REMOVE_SUBSCRIPTION", REMOVE_SUBSCRIPTION);
393: GET_USER_BY_CLIENTID = sqlProperties.getProperty(
394: "GET_USER_BY_CLIENTID", GET_USER_BY_CLIENTID);
395: GET_USER = sqlProperties.getProperty("GET_USER", GET_USER);
396:
397: // Read the queries to populate the tables with initial data
398: for (Iterator i = sqlProperties.entrySet().iterator(); i
399: .hasNext();) {
400: Map.Entry entry = (Map.Entry) i.next();
401: String key = (String) entry.getKey();
402: if (key.startsWith("POPULATE.TABLES."))
403: POPULATE_TABLES.add(entry.getValue());
404: }
405:
406: String createString = sqlProperties
407: .getProperty("CREATE_TABLES_ON_START_UP");
408: if (createString == null)
409: createString = sqlProperties
410: .getProperty("CREATE_TABLES_ON_STARTUP");
411: if (createString == null)
412: createTables = true;
413: else
414: createTables = createString.trim().equalsIgnoreCase("true");
415:
416: if (createTables) {
417: JDBCSession session = new JDBCSession();
418: try {
419: PreparedStatement statement;
420: try {
421: statement = session
422: .prepareStatement(CREATE_USER_TABLE);
423: statement.executeUpdate();
424: } catch (SQLException ignored) {
425: log.trace("Error creating table: "
426: + CREATE_USER_TABLE, ignored);
427: }
428: try {
429: statement = session
430: .prepareStatement(CREATE_ROLE_TABLE);
431: statement.executeUpdate();
432: } catch (SQLException ignored) {
433: log.trace("Error creating table: "
434: + CREATE_ROLE_TABLE, ignored);
435: }
436: try {
437: statement = session
438: .prepareStatement(CREATE_SUBSCRIPTION_TABLE);
439: statement.executeUpdate();
440: } catch (SQLException ignored) {
441: log.trace("Error creating table: "
442: + CREATE_SUBSCRIPTION_TABLE, ignored);
443: }
444:
445: Iterator iter = POPULATE_TABLES.iterator();
446: String nextQry = null;
447: while (iter.hasNext()) {
448: try {
449: nextQry = (String) iter.next();
450: statement = session.prepareStatement(nextQry);
451: statement.execute();
452: } catch (SQLException ignored) {
453: log.trace(
454: "Error populating tables: " + nextQry,
455: ignored);
456: }
457: }
458: } finally {
459: session.close();
460: }
461: }
462: }
463:
464: /**
465: * This inner class helps handle the jdbc connections.
466: */
467: class JDBCSession {
468: boolean trace = log.isTraceEnabled();
469:
470: Transaction threadTx;
471:
472: Connection connection;
473:
474: HashSet statements = new HashSet();
475:
476: HashSet resultSets = null;
477:
478: JDBCSession() throws JMSException {
479: try {
480: // Suspend any previous transaction
481: threadTx = tm.suspend();
482: try {
483: // Always begin a transaction
484: tm.begin();
485: try {
486: // Retrieve a connection
487: connection = getConnection();
488: } catch (Throwable t) {
489: // Rollback the previously started transaction
490: try {
491: tm.rollback();
492: } catch (Throwable ignored) {
493: log.warn("Unable to rollback transaction",
494: ignored);
495: }
496: throw t;
497: }
498: } catch (Throwable t) {
499: // Resume the previous transaction
500: try {
501: if (threadTx != null)
502: tm.resume(threadTx);
503: } catch (Throwable ignored) {
504: log.warn("Unable to resume transaction "
505: + threadTx, ignored);
506: }
507: throw t;
508: }
509: } catch (Throwable t) {
510: throw new SpyJMSException(
511: "Error creating connection to the database.", t);
512: }
513: }
514:
515: PreparedStatement prepareStatement(String sql)
516: throws SQLException {
517: PreparedStatement result = connection.prepareStatement(sql);
518: statements.add(result);
519: return result;
520: }
521:
522: void setRollbackOnly() throws JMSException {
523: try {
524: tm.setRollbackOnly();
525: } catch (Exception e) {
526: throw new SpyJMSException(
527: "Could not mark the transaction for rollback.",
528: e);
529: }
530: }
531:
532: void addResultSet(ResultSet rs) {
533: if (resultSets == null)
534: resultSets = new HashSet();
535: resultSets.add(rs);
536: }
537:
538: void close() throws JMSException {
539: if (resultSets != null) {
540: for (Iterator i = resultSets.iterator(); i.hasNext();) {
541: ResultSet rs = (ResultSet) i.next();
542: try {
543: rs.close();
544: } catch (Throwable ignored) {
545: if (trace)
546: log.trace("Unable to close result set",
547: ignored);
548: }
549: }
550: }
551:
552: for (Iterator i = statements.iterator(); i.hasNext();) {
553: Statement s = (Statement) i.next();
554: try {
555: s.close();
556: } catch (Throwable ignored) {
557: if (trace)
558: log.trace("Unable to close statement", ignored);
559: }
560: }
561:
562: try {
563: if (connection != null)
564: connection.close();
565: } catch (Throwable ignored) {
566: if (trace)
567: log.trace("Unable to close connection", ignored);
568: }
569:
570: try {
571: if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
572: tm.rollback();
573: } else {
574: tm.commit();
575: }
576: } catch (Exception e) {
577: throw new SpyJMSException(
578: "Could not commit/rollback a transaction with the transaction manager.",
579: e);
580: } finally {
581: try {
582: if (threadTx != null)
583: tm.resume(threadTx);
584: } catch (Throwable ignored) {
585: log.warn(
586: "Unable to resume transaction " + threadTx,
587: ignored);
588: }
589: }
590: }
591:
592: /**
593: * Gets a connection from the datasource, retrying as needed. This was
594: * implemented because in some minimal configurations (i.e. little logging
595: * and few services) the database wasn't ready when we tried to get a
596: * connection. We, therefore, implement a retry loop wich is controled
597: * by the ConnectionRetryAttempts attribute. Submitted by terry@amicas.com
598: *
599: * @exception SQLException if an error occurs.
600: */
601: protected Connection getConnection() throws SQLException {
602: int attempts = connectionRetryAttempts;
603: int attemptCount = 0;
604: SQLException sqlException = null;
605: while (attempts-- > 0) {
606: if (++attemptCount > 1) {
607: log.debug("Retrying connection: attempt # "
608: + attemptCount);
609: }
610: try {
611: sqlException = null;
612: return dataSource.getConnection();
613: } catch (SQLException exception) {
614: log.debug("Connection attempt # " + attemptCount
615: + " failed with SQLException", exception);
616: sqlException = exception;
617: } finally {
618: if (sqlException == null && attemptCount > 1) {
619: log.debug("Connection succeeded on attempt # "
620: + attemptCount);
621: }
622: }
623:
624: if (attempts > 0) {
625: try {
626: Thread.sleep(1500);
627: } catch (InterruptedException interruptedException) {
628: break;
629: }
630: }
631: }
632: if (sqlException != null) {
633: throw sqlException;
634: }
635: throw new SQLException("connection attempt interrupted");
636: }
637: }
638: }
|