0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Mathieu Peltier, Sara Bouchenak, Jean-Bernard van Zuylen, Guillaume Smet.
0023: */package org.continuent.sequoia.controller.backend;
0024:
0025: import java.io.StringReader;
0026: import java.net.ConnectException;
0027: import java.sql.Connection;
0028: import java.sql.SQLException;
0029: import java.sql.SQLWarning;
0030: import java.sql.Savepoint;
0031: import java.sql.Statement;
0032: import java.util.ArrayList;
0033: import java.util.Collection;
0034: import java.util.Date;
0035: import java.util.HashMap;
0036: import java.util.Iterator;
0037: import java.util.List;
0038: import java.util.Map;
0039: import java.util.SortedSet;
0040: import java.util.Vector;
0041:
0042: import javax.management.AttributeChangeNotification;
0043: import javax.management.MalformedObjectNameException;
0044: import javax.management.Notification;
0045: import javax.management.NotificationBroadcasterSupport;
0046:
0047: import org.continuent.sequoia.common.exceptions.NoTransactionStartWhenDisablingException;
0048: import org.continuent.sequoia.common.exceptions.UnreachableBackendException;
0049: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0050: import org.continuent.sequoia.common.i18n.Translate;
0051: import org.continuent.sequoia.common.jmx.JmxConstants;
0052: import org.continuent.sequoia.common.jmx.management.BackendInfo;
0053: import org.continuent.sequoia.common.jmx.management.BackendState;
0054: import org.continuent.sequoia.common.jmx.monitoring.backend.BackendStatistics;
0055: import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
0056: import org.continuent.sequoia.common.log.Trace;
0057: import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
0058: import org.continuent.sequoia.common.sql.schema.DatabaseProcedure;
0059: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0060: import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0061: import org.continuent.sequoia.common.sql.schema.DatabaseTable;
0062: import org.continuent.sequoia.common.users.VirtualDatabaseUser;
0063: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0064: import org.continuent.sequoia.common.xml.XmlComponent;
0065: import org.continuent.sequoia.controller.backend.rewriting.AbstractRewritingRule;
0066: import org.continuent.sequoia.controller.connection.AbstractConnectionManager;
0067: import org.continuent.sequoia.controller.connection.DriverManager;
0068: import org.continuent.sequoia.controller.connection.FailFastPoolConnectionManager;
0069: import org.continuent.sequoia.controller.connection.PooledConnection;
0070: import org.continuent.sequoia.controller.connection.RandomWaitPoolConnectionManager;
0071: import org.continuent.sequoia.controller.connection.SimpleConnectionManager;
0072: import org.continuent.sequoia.controller.connection.VariablePoolConnectionManager;
0073: import org.continuent.sequoia.controller.core.ControllerConstants;
0074: import org.continuent.sequoia.controller.jmx.MBeanServerManager;
0075: import org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer;
0076: import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
0077: import org.continuent.sequoia.controller.loadbalancer.BackendWorkerThread;
0078: import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
0079: import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
0080: import org.continuent.sequoia.controller.requests.AbstractRequest;
0081: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0082: import org.continuent.sequoia.controller.requests.CreateRequest;
0083: import org.continuent.sequoia.controller.requests.DropRequest;
0084: import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
0085: import org.dom4j.Document;
0086: import org.dom4j.Element;
0087: import org.dom4j.io.SAXReader;
0088:
0089: /**
0090: * A <code>DatabaseBackend</code> represents a real database backend that will
0091: * have to be bound to a virtual Sequoia database. All connections opened will
0092: * use the same url but possibly different login/password.
0093: *
0094: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0095: * @author <a href="mailto:Mathieu.Peltier@inrialpes.fr">Mathieu Peltier </a>
0096: * @author <a href="mailto:Sara.Bouchenak@epfl.ch">Sara Bouchenak </a>
0097: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
0098: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0099: * </a>
0100: * @author <a href="mailto:guillaume.smet@gmail.com">Guillaume Smet</a>
0101: * @version 1.0
0102: */
0103: public final class DatabaseBackend implements XmlComponent {
0104: //
0105: // How the code is organized?
0106: // 1. Member variables
0107: // 2. Constructor(s)
0108: // 3. Connection management
0109: // 4. Transaction management
0110: // 5. State management
0111: // 6. Schema management
0112: // 7. Monitoring
0113: // 8. Getter/Setter (possibly in alphabetical order)
0114: // 9. Worker threads management
0115: //
0116:
0117: /** Logical name assigned to this backend. */
0118: private String name;
0119:
0120: /** Path for driver */
0121: private String driverPath;
0122:
0123: /** Database native JDBC driver class name. */
0124: private String driverClassName;
0125:
0126: /** Driver compliance to Sequoia requirements */
0127: private transient DriverCompliance driverCompliance;
0128:
0129: /** Real URL to access the database (JDBC URL). */
0130: private String url;
0131:
0132: /** Name of the virtual database this backend is attached to */
0133: private String virtualDatabaseName;
0134:
0135: /** A boolean to know if we should allow this backend to be enabled for write */
0136: private boolean writeCanBeEnabled;
0137:
0138: /** SQL statement used to check if a connection is still valid */
0139: private String connectionTestStatement;
0140:
0141: /**
0142: * The schema of the database. This should be accessed in a synchronized(this)
0143: * block since it can be updated dynamically.
0144: */
0145: private transient DatabaseSchema schema;
0146:
0147: /** <code>true</code> if schema is static. */
0148: private boolean schemaIsStatic = false;
0149:
0150: /**
0151: * <code>true</code> if the backend must maintain its schema dynamically for
0152: * the virtual database needs
0153: */
0154: private boolean schemaIsNeededByVdb = true;
0155:
0156: /** <code>true</code> if schema is no more up-to-date and needs a refresh */
0157: private boolean schemaIsDirty = true;
0158:
0159: /** Connection managers for this backend. */
0160: private transient Map connectionManagers;
0161:
0162: /** List of persistent connections active for this backend (<persistentConnectionId,PooledConnection>) */
0163: private Map persistentConnections;
0164:
0165: /** Logger instance. */
0166: protected transient Trace logger;
0167:
0168: /**
0169: * ArrayList<Long> of active transactions on this backend (as opposed to
0170: * the transactions started on the virtual database managing this backend).
0171: */
0172: private transient ArrayList activeTransactions = new ArrayList();
0173:
0174: /** List of savepoints for each transaction */
0175: private transient Map savepoints = new HashMap();
0176:
0177: /** List of pending requests. */
0178: private transient Vector pendingRequests = new Vector();
0179:
0180: /** List of pending tasks. */
0181: private transient Vector pendingTasks = new Vector();
0182:
0183: /** Task queues to handle writes in RAIDb-1 or RAIDb-2 */
0184: private transient BackendTaskQueues taskQueues = null;
0185: /** List of BackendWorkerThread to execute write queries in RAIDb-1 or RAIDb-2 */
0186: private int nbOfWorkerThreads = 5;
0187: private ArrayList workerThreads = null;
0188: private final Object workerThreadSync = new Object();
0189:
0190: /** Monitoring Values */
0191: private int totalRequest;
0192: private int totalWriteRequest;
0193: private int totalReadRequest;
0194: private int totalTransactions;
0195:
0196: /** List of <code>AbstractRewritingRule</code> objects. */
0197: private ArrayList rewritingRules;
0198:
0199: /** For metadata information generation */
0200: private int dynamicPrecision;
0201: private boolean gatherSystemTables = false;
0202: private DatabaseProcedureSemantic defaultStoredProcedureSemantic;
0203: private HashMap storedProcedureSemantics = new HashMap();
0204:
0205: private String schemaName = null;
0206:
0207: /** Short form of SQL statements to include in traces and exceptions */
0208: private int sqlShortFormLength = 40;
0209:
0210: private String lastKnownCheckpoint;
0211:
0212: /**
0213: * The current state of the backend
0214: *
0215: * @see org.continuent.sequoia.common.jmx.management.BackendState
0216: */
0217: private int state = BackendState.DISABLED;
0218:
0219: private transient BackendStateListener stateListener;
0220:
0221: private NotificationBroadcasterSupport notificationBroadcaster;
0222:
0223: private int notificationSequence = 0;
0224:
0225: private int totalTasks;
0226:
0227: private AbstractConnectionManager defaultConnectionManager = null;
0228:
0229: /* end user logger */
0230: static Trace endUserLogger = Trace
0231: .getLogger("org.continuent.sequoia.enduser");
0232:
0233: /**
0234: * Creates a new <code>DatabaseBackend</code> instance.
0235: *
0236: * @param name logical name assigned to this backend
0237: * @param driverPath path for driver
0238: * @param driverClassName class name of the database native JDBC driver to
0239: * load
0240: * @param url URL to access the database
0241: * @param vdbName Name of the virtual database this backend is attached to
0242: * @param writeCanBeEnabled if writes can be enabled on this backend
0243: * @param connectionTestStatement SQL statement used to check if a connection
0244: * is still valid
0245: * @param nbOfWorkerThreads defines the number of BackendWorkerThread that
0246: * should process writes in parallel (minimum is 2)
0247: */
0248: public DatabaseBackend(String name, String driverPath,
0249: String driverClassName, String url, String vdbName,
0250: boolean writeCanBeEnabled, String connectionTestStatement,
0251: int nbOfWorkerThreads) {
0252: if (name == null)
0253: throw new IllegalArgumentException(Translate
0254: .get("backend.null.backend.name"));
0255:
0256: if (driverClassName == null)
0257: throw new IllegalArgumentException(Translate
0258: .get("backend.null.driver"));
0259:
0260: if (url == null)
0261: throw new IllegalArgumentException(Translate
0262: .get("backend.null.url"));
0263:
0264: if (vdbName == null)
0265: throw new IllegalArgumentException(Translate
0266: .get("backend.null.virtualdatabase.name"));
0267:
0268: if (connectionTestStatement == null)
0269: throw new IllegalArgumentException(Translate
0270: .get("backend.null.connection.test"));
0271:
0272: logger = Trace.getLogger(DatabaseBackend.class.getName() + "."
0273: + vdbName + "." + name);
0274:
0275: if (nbOfWorkerThreads < 2) {
0276: nbOfWorkerThreads = 2;
0277: logger.warn("Invalid number of worker threads ("
0278: + nbOfWorkerThreads
0279: + "), re-adjusting to the minimum (2).");
0280: }
0281:
0282: this .name = name;
0283: this .writeCanBeEnabled = writeCanBeEnabled;
0284: this .driverPath = driverPath;
0285: this .driverClassName = driverClassName;
0286: this .url = url;
0287: this .virtualDatabaseName = vdbName;
0288: this .connectionTestStatement = connectionTestStatement;
0289: this .nbOfWorkerThreads = nbOfWorkerThreads;
0290: this .connectionManagers = new HashMap();
0291: this .persistentConnections = new HashMap();
0292: this .driverCompliance = new DriverCompliance(logger);
0293: totalRequest = 0;
0294: dynamicPrecision = DatabaseBackendSchemaConstants.DynamicPrecisionAll;
0295: }
0296:
0297: /**
0298: * Creates a new <code>DatabaseBackend</code> object
0299: *
0300: * @param info a backend info object to create a database backend object from
0301: */
0302: public DatabaseBackend(BackendInfo info) {
0303: this (info.getName(), info.getDriverPath(), info
0304: .getDriverClassName(), info.getUrl(), info
0305: .getVirtualDatabaseName(), true, info
0306: .getConnectionTestStatement(), info
0307: .getNbOfWorkerThreads());
0308: setDynamicPrecision(info.getDynamicPrecision(), info
0309: .isGatherSystemTables(), info.getSchemaName());
0310: try {
0311: String xml = info.getXml();
0312: StringReader sreader = new StringReader(xml);
0313: SAXReader reader = new SAXReader();
0314: Document document = reader.read(sreader);
0315: Element root = document.getRootElement();
0316: Iterator iter1 = root.elementIterator();
0317: while (iter1.hasNext()) {
0318: Element elem = (Element) iter1.next();
0319: if (elem.getName().equals(
0320: DatabasesXmlTags.ELT_ConnectionManager)) {
0321: String vuser = elem.valueOf("@"
0322: + DatabasesXmlTags.ATT_vLogin);
0323: String rlogin = elem.valueOf("@"
0324: + DatabasesXmlTags.ATT_rLogin);
0325: String rpassword = elem.valueOf("@"
0326: + DatabasesXmlTags.ATT_rPassword);
0327: Iterator iter2 = elem.elementIterator();
0328: while (iter2.hasNext()) {
0329: Element connectionManager = (Element) iter2
0330: .next();
0331: String cname = connectionManager.getName();
0332: if (cname
0333: .equals(DatabasesXmlTags.ELT_VariablePoolConnectionManager)) {
0334: int minPoolSize = Integer
0335: .parseInt(connectionManager
0336: .valueOf("@"
0337: + DatabasesXmlTags.ATT_minPoolSize));
0338: int maxPoolSize = Integer
0339: .parseInt(connectionManager
0340: .valueOf("@"
0341: + DatabasesXmlTags.ATT_maxPoolSize));
0342: int idleTimeout = Integer
0343: .parseInt(connectionManager
0344: .valueOf("@"
0345: + DatabasesXmlTags.ATT_idleTimeout));
0346: int waitTimeout = Integer
0347: .parseInt(connectionManager
0348: .valueOf("@"
0349: + DatabasesXmlTags.ATT_waitTimeout));
0350: this .addConnectionManager(vuser,
0351: new VariablePoolConnectionManager(
0352: url, name, rlogin,
0353: rpassword, driverPath,
0354: driverClassName,
0355: minPoolSize, maxPoolSize,
0356: idleTimeout, waitTimeout));
0357: } else if (cname
0358: .equals(DatabasesXmlTags.ELT_SimpleConnectionManager)) {
0359: this
0360: .addConnectionManager(
0361: vuser,
0362: new SimpleConnectionManager(
0363: url, name, rlogin,
0364: rpassword,
0365: driverPath,
0366: driverClassName));
0367: } else if (cname
0368: .equals(DatabasesXmlTags.ELT_RandomWaitPoolConnectionManager)) {
0369: int poolSize = Integer
0370: .parseInt(connectionManager
0371: .valueOf("@"
0372: + DatabasesXmlTags.ATT_poolSize));
0373: int timeout = Integer
0374: .parseInt(connectionManager
0375: .valueOf("@"
0376: + DatabasesXmlTags.ATT_timeout));
0377: this
0378: .addConnectionManager(
0379: vuser,
0380: new RandomWaitPoolConnectionManager(
0381: url, name, rlogin,
0382: rpassword,
0383: driverPath,
0384: driverClassName,
0385: poolSize, timeout)); /* Lookup functions */
0386:
0387: } else if (cname
0388: .equals(DatabasesXmlTags.ELT_FailFastPoolConnectionManager)) {
0389: int poolSize = Integer
0390: .parseInt(connectionManager
0391: .valueOf("@"
0392: + DatabasesXmlTags.ATT_poolSize));
0393: this .addConnectionManager(vuser,
0394: new FailFastPoolConnectionManager(
0395: url, name, rlogin,
0396: rpassword, driverPath,
0397: driverClassName, poolSize));
0398: }
0399: }
0400: }
0401: }
0402:
0403: } catch (Exception e) {
0404: logger.error(Translate.get(
0405: "backend.add.connection.manager.failed", e), e);
0406: }
0407: }
0408:
0409: /**
0410: * Additionnal constructor for setting a different dynamic schema level.
0411: * Default was to gather all information Creates a new
0412: * <code>DatabaseBackend</code> instance.
0413: *
0414: * @param name logical name assigned to this backend
0415: * @param driverPath path for driver
0416: * @param driverClassName class name of the database native JDBC driver to
0417: * load
0418: * @param url URL to access the database
0419: * @param vdbName Name of the virtual database this backend is attached to
0420: * @param connectionTestStatement SQL statement used to check if a connection
0421: * is still valid
0422: * @param nbOfWorkerThreads defines the number of BackendWorkerThread that
0423: * should process writes in parallel (minimum is 2)
0424: * @param dynamicSchemaLevel for dynamically gathering schema from backend
0425: */
0426: public DatabaseBackend(String name, String driverPath,
0427: String driverClassName, String url, String vdbName,
0428: String connectionTestStatement, int nbOfWorkerThreads,
0429: String dynamicSchemaLevel) {
0430: this (name, driverPath, driverClassName, url, vdbName, true,
0431: connectionTestStatement, nbOfWorkerThreads);
0432: this .dynamicPrecision = DatabaseBackendSchemaConstants
0433: .getDynamicSchemaLevel(dynamicSchemaLevel);
0434: }
0435:
0436: /**
0437: * Returns a deeply copied clone of this backend. Will use the same rewriting
0438: * rules and will get new instance of connection managers with the same
0439: * configuration
0440: *
0441: * @param newName the new name for this new backend
0442: * @param parameters a set of parameters to use to replace values from the
0443: * copied backend. <br>
0444: * The different parameters are: <br>
0445: * <ul>
0446: * <li><tt>driverPath</tt>: the path to the driver</li>
0447: * <li><tt>driver</tt>: the driver class name</li>
0448: * <li><tt>url</tt>: the url to connect to the database</li>
0449: * <li><tt>connectionTestStatement</tt>: the query to test the
0450: * connection</li>
0451: * </ul>
0452: * <br>
0453: * @return <code>DatabaseBackend</code> instance
0454: * @throws Exception if cannot proceed the copy
0455: */
0456: public DatabaseBackend copy(String newName, Map parameters)
0457: throws Exception {
0458: // Get the parameters from the backend if they are not specified, or take
0459: // them from the map of parameters otherwise.
0460: String fromDriverPath = parameters
0461: .containsKey(DatabasesXmlTags.ATT_driverPath) ? (String) parameters
0462: .get(DatabasesXmlTags.ATT_driverPath)
0463: : this .getDriverPath();
0464:
0465: String fromDriverClassName = parameters
0466: .containsKey(DatabasesXmlTags.ATT_driver) ? (String) parameters
0467: .get(DatabasesXmlTags.ATT_driver)
0468: : this .getDriverClassName();
0469:
0470: String fromUrl = parameters
0471: .containsKey(DatabasesXmlTags.ATT_url) ? (String) parameters
0472: .get(DatabasesXmlTags.ATT_url) /* Lookup functions */
0473:
0474: : this .getURL();
0475:
0476: String fromConnectionTestStatement = parameters
0477: .containsKey(DatabasesXmlTags.ATT_connectionTestStatement) ? (String) parameters
0478: .get(DatabasesXmlTags.ATT_connectionTestStatement)
0479: : this .getConnectionTestStatement();
0480:
0481: if (getURL().equals(fromUrl)
0482: && getDriverClassName().equals(fromDriverClassName))
0483: throw new VirtualDatabaseException(
0484: "It is not allowed to clone a backend with the same URL and driver class name");
0485:
0486: // Create the new backend object
0487: DatabaseBackend newBackend = new DatabaseBackend(newName,
0488: fromDriverPath, fromDriverClassName, fromUrl,
0489: virtualDatabaseName, writeCanBeEnabled,
0490: fromConnectionTestStatement, nbOfWorkerThreads);
0491:
0492: // Clone dynamic precision
0493: newBackend.setDynamicPrecision(this .dynamicPrecision,
0494: this .gatherSystemTables, this .schemaName);
0495:
0496: // Set the rewriting rules and the connection managers as the backend we
0497: // are copying
0498: newBackend.rewritingRules = this .rewritingRules;
0499:
0500: // Set Connection managers
0501: Map fromConnectionManagers = this .connectionManagers;
0502: Iterator iter = fromConnectionManagers.keySet().iterator();
0503:
0504: String vlogin = null;
0505: AbstractConnectionManager connectionManager;
0506: while (iter.hasNext()) {
0507: vlogin = (String) iter.next();
0508: connectionManager = (AbstractConnectionManager) fromConnectionManagers
0509: .get(vlogin);
0510: newBackend.addConnectionManager(vlogin, connectionManager
0511: .copy(fromUrl, newName));
0512: }
0513:
0514: return newBackend;
0515: }
0516:
0517: /**
0518: * Two database backends are considered equal if they have the same name, URL
0519: * and driver class name.
0520: *
0521: * @param other an object
0522: * @return a <code>boolean</code> value
0523: */
0524: public boolean equals(Object other) {
0525: if ((other == null) || (!(other instanceof DatabaseBackend)))
0526: return false;
0527: else {
0528: DatabaseBackend b = (DatabaseBackend) other;
0529: return name.equals(b.getName())
0530: && driverClassName.equals(b.getDriverClassName())
0531: && url.equals(b.getURL());
0532: }
0533: }
0534:
0535: /**
0536: * @see java.lang.Object#hashCode()
0537: */
0538: public int hashCode() {
0539: return name.hashCode();
0540: }
0541:
0542: //
0543: // Connection management
0544: //
0545:
0546: /**
0547: * Adds a <code>ConnectionManager</code> to this backend. Note that the
0548: * <code>ConnectionManager</code> is not initialized in this method.
0549: *
0550: * @param vLogin the virtual login corresponding to this connection manager
0551: * @param connectionManager the <code>ConnectionManager</code> to add
0552: */
0553: public void addConnectionManager(String vLogin,
0554: AbstractConnectionManager connectionManager) {
0555: if (connectionManager == null)
0556: throw new IllegalArgumentException(Translate.get(
0557: "backend.null.connection.manager", new String[] {
0558: name, url }));
0559: if (logger.isInfoEnabled())
0560: logger.info(Translate.get(
0561: "backend.add.connection.manager.for.user", vLogin));
0562: connectionManager.setVLogin(vLogin);
0563: connectionManager
0564: .setConnectionTestStatement(getConnectionTestStatement());
0565: connectionManagers.put(vLogin, connectionManager);
0566: }
0567:
0568: /**
0569: * Adds a default connection manager on this backend for the specified user.
0570: * The default connection manager should be specified in the vdb config file.
0571: *
0572: * @param vdbUser user for whom the connnection manager will be added.
0573: * @throws SQLException if connection manager could not be initialized.
0574: */
0575: public void addDefaultConnectionManager(VirtualDatabaseUser vdbUser)
0576: throws SQLException {
0577: if (defaultConnectionManager == null) {
0578: if (logger.isWarnEnabled()) {
0579: logger
0580: .warn("Default connection manager undefined in backend configuration, setting to a VariablePoolConnectionManager");
0581: }
0582: defaultConnectionManager = new VariablePoolConnectionManager(
0583: url, name, vdbUser.getLogin(), vdbUser
0584: .getPassword(), driverPath,
0585: driverClassName, 20, 5, 0, 180, 0);
0586: }
0587: AbstractConnectionManager connectionManager = defaultConnectionManager
0588: .clone(vdbUser.getLogin(), vdbUser.getPassword());
0589: connectionManager.initializeConnections();
0590: addConnectionManager(vdbUser.getLogin(), connectionManager);
0591: }
0592:
0593: /**
0594: * Removes a <code>ConnectionManager</code> to this backend. Note that the
0595: * <code>ConnectionManager</code> is finalized in this method.
0596: *
0597: * @param vdbUser the virtual user corresponding to this connection manager
0598: * @throws SQLException if connection manager could not be finalized.
0599: */
0600: public void removeConnectionManager(VirtualDatabaseUser vdbUser)
0601: throws SQLException {
0602: ((AbstractConnectionManager) connectionManagers.get(vdbUser))
0603: .finalizeConnections();
0604: connectionManagers.remove(vdbUser);
0605: }
0606:
0607: /**
0608: * Check if the driver used by this backend is compliant with Sequoia needs.
0609: *
0610: * @throws SQLException if the driver is not compliant
0611: */
0612: public void checkDriverCompliance() throws SQLException {
0613: if (connectionManagers.isEmpty())
0614: throw new SQLException(Translate.get(
0615: "backend.null.connection.manager", new String[] {
0616: name, url }));
0617:
0618: AbstractConnectionManager connectionManager;
0619: Iterator iter = connectionManagers.values().iterator();
0620: connectionManager = (AbstractConnectionManager) iter.next();
0621:
0622: try {
0623: if (!driverCompliance.complianceTest(url, connectionManager
0624: .getLogin(), connectionManager.getPassword(),
0625: connectionManager.getDriverPath(),
0626: connectionManager.getDriverClassName(),
0627: connectionTestStatement))
0628: throw new SQLException(Translate.get(
0629: "backend.driver.not.compliant",
0630: driverClassName,
0631: ControllerConstants.PRODUCT_NAME));
0632: } catch (ConnectException e) {
0633: throw (SQLException) new SQLException(Translate.get(
0634: "backend.cannot.connect.to", e)).initCause(e);
0635: }
0636: }
0637:
0638: /**
0639: * Sets the default connection manager used for the transparent login feature.
0640: *
0641: * @param defaultConnectionManager connection manager
0642: */
0643: public void setDefaultConnectionManager(
0644: AbstractConnectionManager defaultConnectionManager) {
0645: this .defaultConnectionManager = defaultConnectionManager;
0646: }
0647:
0648: /**
0649: * Initializes the connection managers' connections. The caller must ensure
0650: * that the driver has already been loaded else an exception will be thrown.
0651: *
0652: * @exception SQLException if an error occurs
0653: */
0654: public synchronized void initializeConnections()
0655: throws SQLException {
0656: if (connectionManagers.isEmpty())
0657: throw new SQLException(Translate.get("backend.not.defined",
0658: new String[] { name, url }));
0659:
0660: AbstractConnectionManager connectionManager;
0661: Iterator iter = connectionManagers.values().iterator();
0662: while (iter.hasNext()) {
0663: connectionManager = (AbstractConnectionManager) iter.next();
0664: if (!connectionManager.isInitialized())
0665: connectionManager.initializeConnections();
0666: }
0667: }
0668:
0669: /**
0670: * Releases all the connections to the database held by the connection
0671: * managers.
0672: *
0673: * @throws SQLException if an error occurs
0674: */
0675: public synchronized void finalizeConnections() throws SQLException {
0676: /*
0677: * Remove our references to persistentConnections so that they can garbage
0678: * collected.
0679: */
0680: synchronized (persistentConnections) {
0681: persistentConnections.clear();
0682: }
0683: if (connectionManagers.isEmpty())
0684: throw new SQLException(Translate.get("backend.not.defined",
0685: new String[] { name, url }));
0686:
0687: AbstractConnectionManager connectionManager;
0688: Iterator iter = connectionManagers.values().iterator();
0689: while (iter.hasNext()) {
0690: connectionManager = (AbstractConnectionManager) iter.next();
0691: if (connectionManager.isInitialized())
0692: connectionManager.finalizeConnections();
0693: }
0694: }
0695:
0696: /**
0697: * Force all connections to be renewed when they are used next. This just sets
0698: * a flag and connections will be lazily replaced as needed.
0699: */
0700: public void flagAllConnectionsForRenewal() {
0701: AbstractConnectionManager connectionManager;
0702: Iterator iter = connectionManagers.values().iterator();
0703: while (iter.hasNext()) {
0704: connectionManager = (AbstractConnectionManager) iter.next();
0705: if (connectionManager.isInitialized())
0706: connectionManager.flagAllConnectionsForRenewal();
0707: }
0708: }
0709:
0710: /**
0711: * mutex used only as a synchronization point for getting a connection in
0712: * getConnection...IfNeeded() method.<br />
0713: * This method uses a synchronized block on this mutex rather than
0714: * synchronizing on <code>this</code> so that we won't hang all the system
0715: * if the method blocks because no connections are available.
0716: *
0717: * @see #getConnectionForTransactionAndLazyBeginIfNeeded(AbstractRequest,
0718: * AbstractConnectionManager)
0719: */
0720: private final Object connectionMutex = new Object();
0721:
0722: /**
0723: * Add a persistent connection to this backend
0724: *
0725: * @param persistentConnectionId id of the persistent connection to add
0726: * @param c the persistent connection to add
0727: */
0728: public void addPersistentConnection(long persistentConnectionId,
0729: PooledConnection c) {
0730: synchronized (persistentConnections) {
0731: persistentConnections.put(new Long(persistentConnectionId),
0732: c);
0733: }
0734: }
0735:
0736: /**
0737: * Returns <code>true</code> if this DatabaseBackend has persistent
0738: * connections, <code>false</code> else.
0739: *
0740: * @return <code>true</code> if this DatabaseBackend has persistent
0741: * connections, <code>false</code> else
0742: */
0743: public boolean hasPersistentConnections() {
0744: return (persistentConnections.size() > 0);
0745: }
0746:
0747: /**
0748: * Remove a persistent connection from this backend
0749: *
0750: * @param persistentConnectionId id of the persistent connection to remove
0751: */
0752: public void removePersistentConnection(long persistentConnectionId) {
0753: synchronized (persistentConnections) {
0754: persistentConnections.remove(new Long(
0755: persistentConnectionId));
0756: if (persistentConnections.isEmpty()) {
0757: persistentConnections.notifyAll();
0758: }
0759: }
0760: }
0761:
0762: /**
0763: * Retrieve a connection for a given transaction or create a new connection
0764: * and start a new transaction. <br>
0765: * This method is internally synchronized on a mutex so that concurrent writes
0766: * within the same transaction that are allowed to execute out of order will
0767: * not open separate connection if they race on transaction begin.
0768: *
0769: * @param request request that will execute (must carry transaction id and
0770: * transaction isolation level (does nothing if equals to
0771: * Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL))
0772: * @param cm connection manager to get the connection from
0773: * @return the connection for the given transaction id
0774: * @throws UnreachableBackendException if the backend is no more reachable
0775: * @throws NoTransactionStartWhenDisablingException if a new transaction
0776: * needed to be started but the backend is in the disabling state
0777: * @throws SQLException if another error occurs
0778: * @see #connectionMutex
0779: */
0780: public Connection getConnectionForTransactionAndLazyBeginIfNeeded(
0781: AbstractRequest request, AbstractConnectionManager cm)
0782: throws UnreachableBackendException,
0783: NoTransactionStartWhenDisablingException, SQLException {
0784: Long tid = new Long(request.getTransactionId());
0785: /*
0786: * An existing transaction that already has a connection must not be blocked
0787: * by transactions that are waiting for connections. Transactions that have
0788: * started and have a connection are past the point where concurrent
0789: * requests are a problem.
0790: */
0791: if (isStartedTransaction(tid)) {
0792: PooledConnection pc = cm
0793: .retrieveConnectionForTransaction(request
0794: .getTransactionId());
0795: if (pc != null) {
0796: return pc.getConnection();
0797: }
0798: }
0799: synchronized (connectionMutex) {
0800: /*
0801: * Repeat the test from above, because a concurrent write request may have
0802: * changed the transaction status while this request was waiting for the
0803: * mutex.
0804: */
0805: if (isStartedTransaction(tid)) { // Transaction has already been started, retrieve connection
0806: PooledConnection pc = cm
0807: .retrieveConnectionForTransaction(request
0808: .getTransactionId());
0809: if ((pc == null) && isDisabling()) {
0810: /**
0811: * The backend is disabling and this transaction was added at
0812: * disabling time because it was already logged but we didn't lazily
0813: * start it yet, so let's do the lazy start now.
0814: *
0815: * @see RequestManager#disableBackendStoreCheckpointAndSetDisabling(DatabaseBackend,
0816: * String)
0817: */
0818: Connection c = AbstractLoadBalancer
0819: .getConnectionAndBeginTransaction(this , cm,
0820: request);
0821: if (c == null) {
0822: if (logger.isWarnEnabled()) {
0823: logger
0824: .warn("Null connection returned from AbstractLoadBalancer.getConnectionAndBeginTransaction() (backend is disabling)");
0825: }
0826: }
0827: return c;
0828: }
0829: if ((pc == null) && !isDisabling()) { // Sanity check, should never happen
0830: if (logger.isErrorEnabled()) {
0831: logger.error("Null connection [tid = "
0832: + request.getTransactionId()
0833: + ", backend state = " + state
0834: + ", active tx = " + activeTransactions
0835: + "]");
0836: }
0837: return null;
0838: }
0839: return pc.getConnection();
0840: } else {
0841: if (!canAcceptTasks(request))
0842: throw new NoTransactionStartWhenDisablingException();
0843:
0844: Connection c = null;
0845:
0846: // Transaction has not been started yet, this is a lazy begin
0847: c = AbstractLoadBalancer
0848: .getConnectionAndBeginTransaction(this , cm,
0849: request);
0850: // begin transaction
0851: startTransaction(tid);
0852:
0853: if (c == null) {
0854: if (logger.isWarnEnabled()) {
0855: logger
0856: .warn("Null connection returned from AbstractLoadBalancer.getConnectionAndBeginTransaction() [state = "
0857: + state + "]");
0858: }
0859: }
0860: return c;
0861: }
0862: } // end of synchronized (connectionMutex)
0863: }
0864:
0865: /**
0866: * Returns the <code>ConnectionManager</code> associated to this backend for
0867: * a given virtual login.
0868: *
0869: * @param vLogin the virtual login
0870: * @return an <code>AbstractConnectionManager</code> instance
0871: */
0872: public AbstractConnectionManager getConnectionManager(String vLogin) {
0873: return (AbstractConnectionManager) connectionManagers
0874: .get(vLogin);
0875: }
0876:
0877: /**
0878: * Returns the SQL statement to use to check the connection validity.
0879: *
0880: * @return a <code>String</code> containing a SQL statement
0881: */
0882: public String getConnectionTestStatement() {
0883: return connectionTestStatement;
0884: }
0885:
0886: /**
0887: * Retrieves the SQLWarning chained to the given persistent connection
0888: *
0889: * @param connId persistent connection id to retrieve warnings from
0890: * @exception SQLException if a database access error occurs or this method is
0891: * called on a closed connection
0892: * @return the warnings on the given connection or null
0893: */
0894: public SQLWarning getPersistentConnectionWarnings(long connId)
0895: throws SQLException {
0896: if (persistentConnections != null) {
0897: PooledConnection pc = (PooledConnection) persistentConnections
0898: .get(new Long(connId));
0899: if (pc != null)
0900: return pc.getConnection().getWarnings();
0901: }
0902: return null;
0903: }
0904:
0905: /**
0906: * Clears the SQLWarning chained to the given persistent connection
0907: *
0908: * @param connId persistent connection id to retrieve warnings from
0909: * @exception SQLException if a database access error occurs
0910: */
0911: public void clearPersistentConnectionWarnings(long connId)
0912: throws SQLException {
0913: if (persistentConnections != null) {
0914: PooledConnection pc = (PooledConnection) persistentConnections
0915: .get(new Long(connId));
0916: if (pc != null)
0917: pc.getConnection().clearWarnings();
0918: }
0919: }
0920:
0921: /**
0922: * Check if the given connection is valid or not. This function issues the
0923: * connectionTestStatement query on the connection and if it succeeds then the
0924: * connection is declared valid. If an exception occurs, the connection is
0925: * declared invalid.
0926: *
0927: * @param connection the connection to test
0928: * @return true if the connection is valid
0929: */
0930: public boolean isValidConnection(Connection connection) {
0931: try {
0932: Statement s = connection.createStatement();
0933: s.executeQuery(connectionTestStatement);
0934: } catch (SQLException e) {
0935: if ("25P02".equals(e.getSQLState())
0936: || (e.getMessage() != null && e
0937: .getMessage()
0938: .indexOf(
0939: "ignored until end of transaction block") > 0)) {
0940: // see bug item #300873 on the forge for details
0941: // PostgreSQL throws an exception if a query is issued after a request
0942: // has failed within a transaction, we now have to check for this
0943: // exception as it means the connection is valid
0944: //
0945: // PostgreSQL versions after 7.4 will return the SQLState if version 3
0946: // of the protocol is used (it is the default version unless you use
0947: // protocolVersion parameter in your JDBC url), whereas PostgreSQL
0948: // versions prior to 7.4 (version 2 of the protocol) will have to be
0949: // checked for the message text
0950: return true;
0951: }
0952: return false;
0953: }
0954: return true;
0955: }
0956:
0957: /**
0958: * Tell all the connection managers to stop handing out new Connections.
0959: */
0960: public synchronized void shutdownConnectionManagers() {
0961: for (Iterator iter = connectionManagers.values().iterator(); iter
0962: .hasNext();) {
0963: AbstractConnectionManager cm = (AbstractConnectionManager) iter
0964: .next();
0965: if (cm.isInitialized()) {
0966: cm.shutdown();
0967: }
0968: }
0969: }
0970:
0971: //
0972: // Transaction management
0973: //
0974:
0975: /**
0976: * Adds a savepoint to a given transaction
0977: *
0978: * @param tid transaction identifier
0979: * @param savepoint savepoint to add
0980: */
0981: public void addSavepoint(Long tid, Savepoint savepoint) {
0982: synchronized (savepoints) {
0983: List savepointList = (List) savepoints.get(tid);
0984: if (savepointList == null) { // Lazy list creation
0985: savepointList = new ArrayList();
0986: savepoints.put(tid, savepointList);
0987: }
0988: savepointList.add(savepoint);
0989: }
0990: }
0991:
0992: /**
0993: * Returns a Savepoint identified its name for a given transaction or
0994: * <code>null</code> if no such Savepoint exists.
0995: *
0996: * @param tid transaction identifier
0997: * @param savepointName name of the savepoint
0998: * @return a savepoint or <code>null</code> if there is no savepoint of the
0999: * given <code>savePointName</code> for the transaction identified
1000: * by the <code>tid</code>
1001: */
1002: public Savepoint getSavepoint(Long tid, String savepointName) {
1003: synchronized (savepoints) {
1004: List savepointList = (List) savepoints.get(tid);
1005: if (savepointList == null)
1006: return null; // No checkpoint for that transaction
1007:
1008: Iterator i = savepointList.iterator();
1009: while (i.hasNext()) {
1010: try {
1011: Savepoint savepoint = (Savepoint) i.next();
1012: if (savepointName.equals(savepoint
1013: .getSavepointName()))
1014: return savepoint;
1015: } catch (SQLException ignore) {
1016: // We should never get here because we always use named savepoints
1017: // on backends
1018: }
1019: }
1020: }
1021:
1022: // No savepoint has been found for given savepoint name
1023: return null;
1024: }
1025:
1026: /**
1027: * Returns <code>true</code> if the specified transaction has been started
1028: * on this backend (a connection has been allocated for this transaction).
1029: *
1030: * @param tid transaction identifier
1031: * @return <code>true</code> if the transaction has been started
1032: */
1033: public boolean isStartedTransaction(Long tid) {
1034: synchronized (activeTransactions) {
1035: return activeTransactions.contains(tid);
1036: }
1037: }
1038:
1039: /**
1040: * Removes a savepoint for a given transaction
1041: *
1042: * @param tid transaction identifier
1043: * @param savepoint savepoint to remove
1044: */
1045: public void removeSavepoint(Long tid, Savepoint savepoint) {
1046: synchronized (savepoints) {
1047: List savepointList = (List) savepoints.get(tid);
1048: if (savepointList == null)
1049: logger.error("No savepoints found for transaction "
1050: + tid);
1051: else
1052: savepointList.remove(savepoint);
1053: }
1054: }
1055:
1056: /**
1057: * Signals that a transaction has been started on this backend. It means that
1058: * a connection has been allocated for this transaction.
1059: *
1060: * @param tid transaction identifier
1061: */
1062: public void startTransaction(Long tid) {
1063: synchronized (activeTransactions) {
1064: totalTransactions++;
1065: activeTransactions.add(tid);
1066: }
1067: }
1068:
1069: /**
1070: * Signals that a transaction has been stopped on this backend. It means that
1071: * the connection has been released for this transaction.
1072: *
1073: * @param tid transaction identifier
1074: */
1075: public void stopTransaction(Long tid) {
1076: synchronized (activeTransactions) {
1077: if (!activeTransactions.remove(tid))
1078: throw new IllegalArgumentException(Translate.get(
1079: "backend.transaction.not.started",
1080: new String[] { "" + tid, name }));
1081: // If this was the last open transaction, we notify people possibly
1082: // waiting on waitForAllTransactionsToComplete()
1083: if (activeTransactions.isEmpty()) {
1084: activeTransactions.notifyAll();
1085: }
1086: }
1087:
1088: synchronized (savepoints) {
1089: savepoints.remove(tid);
1090: }
1091: }
1092:
1093: /**
1094: * This method waits until all currently open transactions and persistent
1095: * connections on this backend complete. If no transaction are currently
1096: * running on this backend or no persistent connection is open, this method
1097: * immediately returns.
1098: */
1099: public void waitForAllTransactionsAndPersistentConnectionsToComplete() {
1100: // Wait for active transactions to complete
1101: synchronized (activeTransactions) {
1102: if (!activeTransactions.isEmpty()) {
1103: if (logger.isInfoEnabled())
1104: logger
1105: .info("Backend "
1106: + name
1107: + " wait for "
1108: + activeTransactions.size()
1109: + " transactions to complete before disabling completely.");
1110:
1111: try {
1112: activeTransactions.wait();
1113: } catch (InterruptedException ignore) {
1114: }
1115: }
1116: }
1117:
1118: // Wait for active persistent connections to close
1119: synchronized (persistentConnections) {
1120: if (!persistentConnections.isEmpty()) {
1121: if (logger.isInfoEnabled())
1122: logger
1123: .info("Backend "
1124: + name
1125: + " wait for "
1126: + persistentConnections.size()
1127: + " persistent connections to close before disabling completely.");
1128:
1129: try {
1130: persistentConnections.wait();
1131: } catch (InterruptedException ignore) {
1132: }
1133: }
1134: }
1135: }
1136:
1137: //
1138: // State Management
1139: //
1140:
1141: /**
1142: * Returns true if the backend is in a state that allows it to accept tasks or
1143: * the execution of the specified request.
1144: *
1145: * @param request the request that is going to execute in that task (null if
1146: * not applicable).
1147: * @return Returns true if backend isReadEnabled() or isWriteEnabled() or
1148: * isReplaying()
1149: */
1150: public boolean canAcceptTasks(AbstractRequest request) {
1151: if (request != null)
1152: return canAcceptTasks(request.isPersistentConnection(),
1153: request.getPersistentConnectionId());
1154: else
1155: return canAcceptTasks(false, -1);
1156: }
1157:
1158: /**
1159: * Returns true if the backend is in a state that allows it to accept tasks or
1160: * the execution for the specified persistent connection.
1161: *
1162: * @param persistentConnectionId the persistent connection id of the request
1163: * that is going to execute .
1164: * @return Returns true if backend isReadEnabled() or isWriteEnabled() or
1165: * isReplaying()
1166: */
1167: public boolean canAcceptTasks(long persistentConnectionId) {
1168: return canAcceptTasks(true, persistentConnectionId);
1169: }
1170:
1171: private boolean canAcceptTasks(boolean isPersistentConnection,
1172: long persistentConnectionId) {
1173: // return isReadEnabled() || isWriteEnabled() || isReplaying();
1174: boolean acceptTask = state == BackendState.READ_ENABLED_WRITE_DISABLED
1175: || state == BackendState.READ_ENABLED_WRITE_ENABLED
1176: || state == BackendState.READ_DISABLED_WRITE_ENABLED
1177: || state == BackendState.REPLAYING;
1178:
1179: if (!acceptTask && isPersistentConnection) {
1180: // Check if the request is on one of our active persistent connections in
1181: // which case we have to execute it.
1182: synchronized (persistentConnections) {
1183: return persistentConnections.containsKey(new Long(
1184: persistentConnectionId));
1185: }
1186: }
1187: return acceptTask;
1188: }
1189:
1190: /**
1191: * Cleans transactions and pending requests / tasks states
1192: */
1193: private void cleanBackendStates() {
1194: activeTransactions.clear();
1195: savepoints.clear();
1196: pendingRequests.clear();
1197: pendingTasks.clear();
1198: if (!isSchemaStatic()) {
1199: setSchemaIsDirty(true, null);
1200: // make sure locks from old transactions are not carried over
1201: schema = null;
1202: }
1203: }
1204:
1205: /**
1206: * Sets the database backend state to disable. This state is just an
1207: * indication and it has no semantic effect. It is up to the request manager
1208: * (especially the load balancer) to ensure that no more requests are sent to
1209: * this backend.
1210: *
1211: * @return false if the backend was already in the disabled state, true
1212: * otherwise
1213: */
1214: public synchronized boolean disable() {
1215: if (getStateValue() == BackendState.DISABLED) {
1216: return false;
1217: }
1218: setState(BackendState.DISABLED);
1219:
1220: return true;
1221: }
1222:
1223: /**
1224: * Disables the database backend for reads. This does not affect write ability
1225: */
1226: public synchronized void disableRead() {
1227: if (isWriteEnabled())
1228: setState(BackendState.READ_DISABLED_WRITE_ENABLED);
1229: else
1230: setState(BackendState.DISABLED);
1231: }
1232:
1233: /**
1234: * Disables the database backend for writes. This does not affect read ability
1235: * although the backend will not be coherent anymore as soon as a write as
1236: * occured. This should be used in conjunction with a checkpoint to recover
1237: * missing writes.
1238: */
1239: public synchronized void disableWrite() {
1240: if (isReadEnabled())
1241: setState(BackendState.READ_ENABLED_WRITE_DISABLED);
1242: else
1243: setState(BackendState.DISABLED);
1244: }
1245:
1246: /**
1247: * Enables the database backend for reads. This method should only be called
1248: * when the backend is synchronized with the others.
1249: */
1250: public synchronized void enableRead() {
1251: if (isWriteEnabled())
1252: setState(BackendState.READ_ENABLED_WRITE_ENABLED);
1253: else
1254: setState(BackendState.READ_ENABLED_WRITE_DISABLED);
1255: }
1256:
1257: /**
1258: * Enables the database backend for writes. This method should only be called
1259: * when the backend is synchronized with the others.
1260: */
1261: public synchronized void enableWrite() {
1262: // Remove last known checkpoint since backend will now be modified and no
1263: // more synchronized with the checkpoint.
1264: setLastKnownCheckpoint(null);
1265: if (isReadEnabled())
1266: setState(BackendState.READ_ENABLED_WRITE_ENABLED);
1267: else
1268: setState(BackendState.READ_DISABLED_WRITE_ENABLED);
1269: }
1270:
1271: /**
1272: * Returns the lastKnownCheckpoint value.
1273: *
1274: * @return Returns the lastKnownCheckpoint.
1275: */
1276: public String getLastKnownCheckpoint() {
1277: return lastKnownCheckpoint;
1278: }
1279:
1280: /**
1281: * Retrieve the state of the backend.
1282: *
1283: * @see SequoiaNotificationList#VIRTUALDATABASE_BACKEND_DISABLED
1284: * @see SequoiaNotificationList#VIRTUALDATABASE_BACKEND_RECOVERING
1285: * @see SequoiaNotificationList#VIRTUALDATABASE_BACKEND_BACKINGUP
1286: * @see SequoiaNotificationList#VIRTUALDATABASE_BACKEND_DISABLING
1287: * @see SequoiaNotificationList#VIRTUALDATABASE_BACKEND_ENABLED
1288: * @see SequoiaNotificationList#VIRTUALDATABASE_BACKEND_DISABLED
1289: * @return one of the above
1290: */
1291: // FIXME should not return key for i18n but i18n translation instead
1292: public String getState() {
1293: switch (state) {
1294: case BackendState.READ_ENABLED_WRITE_DISABLED:
1295: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_ENABLED;
1296: case BackendState.READ_ENABLED_WRITE_ENABLED:
1297: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_ENABLED_WRITE;
1298: case BackendState.READ_DISABLED_WRITE_ENABLED:
1299: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_ENABLED_WRITE;
1300: case BackendState.DISABLING:
1301: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_DISABLING;
1302: case BackendState.BACKUPING:
1303: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_BACKINGUP;
1304: case BackendState.RESTORING:
1305: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_RECOVERING;
1306: case BackendState.REPLAYING:
1307: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING;
1308: case BackendState.DISABLED:
1309: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_DISABLED;
1310: case BackendState.UNKNOWN:
1311: return SequoiaNotificationList.VIRTUALDATABASE_BACKEND_UNKNOWN;
1312: default:
1313: throw new IllegalArgumentException("Unknown backend state:"
1314: + state);
1315: }
1316: }
1317:
1318: /**
1319: * Return the integer value corresponding to the state of the backend. The
1320: * values are defined in <code>BackendState</code>
1321: *
1322: * @return <tt>int</tt> value
1323: * @see BackendState
1324: */
1325: public int getStateValue() {
1326: return state;
1327: }
1328:
1329: /**
1330: * Returns the isBackuping value.
1331: *
1332: * @return Returns the isBackuping.
1333: */
1334: public boolean isBackuping() {
1335: return state == BackendState.BACKUPING;
1336: }
1337:
1338: /**
1339: * Is the backend completely disabled ? This usually means it has a known
1340: * state with a checkpoint associated to it.
1341: *
1342: * @return <code>true</code> if the backend is disabled
1343: */
1344: public boolean isDisabled() {
1345: return state == BackendState.DISABLED;
1346: }
1347:
1348: /**
1349: * Returns the isDisabling value.
1350: *
1351: * @return Returns the isDisabling.
1352: */
1353: public boolean isDisabling() {
1354: return state == BackendState.DISABLING;
1355: }
1356:
1357: /**
1358: * Tests if this backend is initialized
1359: *
1360: * @return <code>true</code> if this backend is initialized
1361: * @throws SQLException if an error occurs
1362: */
1363: public synchronized boolean isInitialized() throws SQLException {
1364: if (connectionManagers.isEmpty())
1365: throw new SQLException(Translate.get(
1366: "backend.null.connection.manager", new String[] {
1367: name, url }));
1368: Iterator iter = connectionManagers.values().iterator();
1369: while (iter.hasNext()) {
1370: if (!((AbstractConnectionManager) iter.next())
1371: .isInitialized())
1372: return false;
1373: }
1374: return true;
1375: }
1376:
1377: /**
1378: * Is the backend accessible ?
1379: *
1380: * @return <tt>true</tt> if a jdbc connection is still possible from the
1381: * controller
1382: */
1383: public synchronized boolean isJDBCConnected() {
1384: Connection con = null;
1385: try {
1386: if (connectionManagers.isEmpty())
1387: throw new SQLException(Translate.get(
1388: "backend.null.connection.manager",
1389: new String[] { name, url }));
1390:
1391: AbstractConnectionManager connectionManager;
1392: Iterator iter = connectionManagers.values().iterator();
1393: connectionManager = (AbstractConnectionManager) iter.next();
1394:
1395: con = connectionManager.getConnectionFromDriver();
1396: if (con == null) {
1397: return false;
1398: }
1399: con.createStatement().execute(this .connectionTestStatement);
1400: return true;
1401: } catch (Exception e) {
1402: String msg = Translate.get(
1403: "loadbalancer.backend.unreacheable", name);
1404: logger.warn(msg, e);
1405: return false;
1406: } finally {
1407: if (con != null) {
1408: try {
1409: con.close();
1410: } catch (SQLException e) {
1411: return false;
1412: }
1413: }
1414: }
1415: }
1416:
1417: /**
1418: * Returns true if the backend cannot be used anymore
1419: *
1420: * @return Returns true if the backend was removed from activity by the load
1421: * balancer
1422: */
1423: // TODO nobody uses this method. Should be removed
1424: public boolean isKilled() {
1425: return state == BackendState.UNKNOWN;
1426: }
1427:
1428: /**
1429: * Tests if this backend is read enabled (active and synchronized).
1430: *
1431: * @return <code>true</code> if this backend is enabled.
1432: */
1433: public synchronized boolean isReadEnabled() {
1434: return state == BackendState.READ_ENABLED_WRITE_DISABLED
1435: || state == BackendState.READ_ENABLED_WRITE_ENABLED;
1436: }
1437:
1438: /**
1439: * Returns true if the backend is in the BackendState.RECOVERING state
1440: *
1441: * @return Returns true if the backend is restoring a dump
1442: */
1443: // TODO nobody uses this method. Should be removed
1444: public boolean isRestoring() {
1445: return state == BackendState.RESTORING;
1446: }
1447:
1448: /**
1449: * Returns true if the backend is in the BackendState.REPLAYING state
1450: *
1451: * @return Returns true if the backend is replaying the recovery
1452: */
1453: public boolean isReplaying() {
1454: return state == BackendState.REPLAYING;
1455: }
1456:
1457: /**
1458: * Tests if this backend is write enabled (active and synchronized).
1459: *
1460: * @return <code>true</code> if this backend is enabled.
1461: */
1462: public synchronized boolean isWriteEnabled() {
1463: return state == BackendState.READ_ENABLED_WRITE_ENABLED
1464: || state == BackendState.READ_DISABLED_WRITE_ENABLED;
1465: }
1466:
1467: /**
1468: * Returns the writeCanBeEnabled value.
1469: *
1470: * @return Returns the writeCanBeEnabled.
1471: */
1472: public boolean isWriteCanBeEnabled() {
1473: return writeCanBeEnabled;
1474: }
1475:
1476: /**
1477: * Sets the NotificationBroadcasterSupport that this backend can use to send
1478: * JMX notification.
1479: *
1480: * @param notificationBroadcaster a NotificationBroadcasterSupport
1481: */
1482: public void setNotificationBroadcaster(
1483: NotificationBroadcasterSupport notificationBroadcaster) {
1484: this .notificationBroadcaster = notificationBroadcaster;
1485: }
1486:
1487: /**
1488: * Notify JMX NotificationListener that the state of this DatabaseBackend has
1489: * changed. The emmitted Notification is a
1490: * <code>AttributeChangeNotification</code> with an attributeName set to
1491: * <code>"StateValue"</code> and an attributeType set to
1492: * <code>Integer</code>
1493: *
1494: * @param message a message about the backend state change
1495: * @param oldState the previous state of the backend
1496: * @param currentState the current state of the backend
1497: * @see BackendState
1498: * @see #getStateValue()
1499: */
1500: private void notifyJMXStateChanged(String message, int oldState,
1501: int currentState) {
1502: try {
1503: Notification attrChangeNotification = new AttributeChangeNotification(
1504: JmxConstants.getDatabaseBackendObjectName(
1505: virtualDatabaseName, name),
1506: notificationSequence++, new Date().getTime(),
1507: message, "StateValue", "Integer", //$NON-NLS-1$ //$NON-NLS-2$
1508: new Integer(oldState), new Integer(currentState));
1509: sendNotification(attrChangeNotification);
1510: } catch (MalformedObjectNameException e) {
1511: logger.warn("Unable to send JMX notification", e);
1512: }
1513: }
1514:
1515: /**
1516: * Sends JMX notification
1517: *
1518: * @param type notification type
1519: * @see SequoiaNotificationList
1520: */
1521: public void notifyJmx(String type) {
1522: notifyJmx(type,
1523: SequoiaNotificationList.NOTIFICATION_LEVEL_INFO,
1524: Translate.get(type, getName()));
1525: }
1526:
1527: /**
1528: * Sends JMX error notification
1529: *
1530: * @param e <tt>Exception</tt> object. Only the message will be used
1531: * @param type notification type
1532: * @see SequoiaNotificationList
1533: */
1534: public void notifyJmxError(String type, Exception e) {
1535: notifyJmx(type,
1536: SequoiaNotificationList.NOTIFICATION_LEVEL_ERROR,
1537: Translate.get(type, new String[] { getName(),
1538: e.getMessage() }));
1539: }
1540:
1541: /**
1542: * Sends a JMX notification.
1543: *
1544: * @param type Type of JMX notification
1545: * @param level unused parameter (will be deprecated)
1546: * @param message Message of the notification
1547: */
1548: private void notifyJmx(String type, String level, String message) {
1549: try {
1550: Notification notification = (new Notification(type,
1551: JmxConstants.getDatabaseBackendObjectName(
1552: virtualDatabaseName, name),
1553: notificationSequence++, message));
1554:
1555: sendNotification(notification);
1556: } catch (MalformedObjectNameException e) {
1557: logger.warn("Unable to send JMX notification", e);
1558: }
1559: }
1560:
1561: /**
1562: * Sends a JMX notification.
1563: *
1564: * @param notification a JMX Notification to send
1565: */
1566: private void sendNotification(Notification notification) {
1567: if (MBeanServerManager.isJmxEnabled()) {
1568: notificationBroadcaster.sendNotification(notification);
1569: }
1570: }
1571:
1572: /**
1573: * Notify the state of the backend has changed.<br />
1574: * This notification triggers the update of the state of the backend stored in
1575: * the recovery log.
1576: */
1577: private void notifyStateListener() {
1578: if (stateListener != null)
1579: stateListener.stateChanged(this );
1580: }
1581:
1582: /**
1583: * Sets the stateListener value.
1584: *
1585: * @param stateListener The stateListener to set.
1586: */
1587: public void setStateListener(BackendStateListener stateListener) {
1588: this .stateListener = stateListener;
1589: }
1590:
1591: /**
1592: * This is used when the backend must be disabled but currently open
1593: * transactions must terminate. This is a transitional state. When disabling
1594: * is complete the caller must set the backend state to disabled.
1595: * <p>
1596: * Reads are no more allowed on the backend and the state is updated so that
1597: * isReadEnabled() returns false.
1598: *
1599: * @see #disable()
1600: * @see #isReadEnabled()
1601: * @deprecated not used anymore. Please use the setState method instead
1602: */
1603: // TODO nobody uses this method. Should be removed
1604: public void setDisabling() {
1605: setState(BackendState.DISABLING);
1606: }
1607:
1608: /**
1609: * setLastKnownCheckpoint for this backend
1610: *
1611: * @param checkpoint the checkpoint
1612: */
1613: public void setLastKnownCheckpoint(String checkpoint) {
1614: this .lastKnownCheckpoint = checkpoint;
1615: notifyStateListener(); // triggers recovery log update
1616: }
1617:
1618: /**
1619: * Set the state of a backend
1620: *
1621: * @param state see BackendState for a possible list of the different state
1622: * @see org.continuent.sequoia.common.jmx.management.BackendState
1623: */
1624: // FIXME should use a type-safe enum to represent backend state instead of
1625: // ints
1626: public synchronized void setState(int state) {
1627: switch (state) {
1628: case BackendState.UNKNOWN:
1629: lastKnownCheckpoint = null;
1630: break;
1631: case BackendState.RESTORING:
1632: case BackendState.REPLAYING:
1633: cleanBackendStates();
1634: break;
1635: case BackendState.READ_ENABLED_WRITE_DISABLED:
1636: case BackendState.READ_ENABLED_WRITE_ENABLED:
1637: case BackendState.READ_DISABLED_WRITE_ENABLED:
1638: case BackendState.DISABLING:
1639: case BackendState.BACKUPING:
1640: case BackendState.DISABLED:
1641: break;
1642: default:
1643: throw new IllegalArgumentException("Unknown backend state:"
1644: + state);
1645: }
1646: int oldState = this .state;
1647: this .state = state;
1648: int currentState = this .state;
1649: if (logger.isDebugEnabled())
1650: logger.debug(Translate.get("backend.state.changed",
1651: new String[] { getQualifiedName(), getState() }));
1652: endUserLogger.info(Translate.get("backend.state.changed",
1653: new String[] { getQualifiedName(),
1654: BackendState.description(currentState) }));
1655:
1656: notifyStateListener();
1657: notifyJMXStateChanged(Translate.get(getState(),
1658: getQualifiedName()), oldState, currentState);
1659: }
1660:
1661: //
1662: // Schema manipulation
1663: //
1664:
1665: /**
1666: * Add a specific semantic to a particular stored procedure
1667: *
1668: * @param procedureName the stored procedure name
1669: * @param parameterCount the number of parameters of the stored procedure
1670: * @param semantic the semantic information
1671: */
1672: public void addStoredProcedureSemantic(String procedureName,
1673: int parameterCount, DatabaseProcedureSemantic semantic) {
1674: storedProcedureSemantics.put(DatabaseProcedure.buildKey(
1675: procedureName, parameterCount), semantic);
1676: }
1677:
1678: /**
1679: * Checks that the current database schema is compatible with all schema
1680: * gathered from each connection manager.
1681: * <p>
1682: * If no schema has been defined, the first gathered schema is used as the
1683: * current database schema.
1684: * <p>
1685: * For each schema that is not compatible with the current schema, a warning
1686: * is issued on the logger for that backend
1687: *
1688: * @param c optional connection from which the schema should be fetched (null
1689: * if not applicable)
1690: * @return true if compatible, false otherwise
1691: */
1692: public synchronized boolean checkDatabaseSchema(Connection c) {
1693: if (logger.isDebugEnabled())
1694: logger.debug(Translate.get("backend.dynamic.schema",
1695: DatabaseBackendSchemaConstants
1696: .getDynamicSchemaLevel(dynamicPrecision)));
1697:
1698: boolean checked = true;
1699: if (c == null) {
1700: AbstractConnectionManager connectionMananger;
1701: Iterator iter = connectionManagers.values().iterator();
1702: while (iter.hasNext()) {
1703: connectionMananger = (AbstractConnectionManager) iter
1704: .next();
1705:
1706: // Gather the database schema from this connection manager
1707: DatabaseBackendMetaData meta = new DatabaseBackendMetaData(
1708: connectionMananger, logger, dynamicPrecision,
1709: gatherSystemTables, virtualDatabaseName,
1710: schemaName);
1711:
1712: DatabaseSchema metaSchema;
1713: try {
1714: if (logger.isDebugEnabled())
1715: logger
1716: .debug(Translate
1717: .get("backend.gathering.database.schema"));
1718: metaSchema = meta.getDatabaseSchema();
1719: } catch (SQLException e) {
1720: if (logger.isWarnEnabled())
1721: logger.warn(Translate.get(
1722: "backend.gather.schema.failed", e));
1723: return false;
1724: }
1725: if (schema == null) {
1726: if (logger.isDebugEnabled())
1727: logger
1728: .debug(Translate
1729: .get("backend.use.gathered.schema.as.new"));
1730: schema = metaSchema;
1731: } else {
1732: if (dynamicPrecision == DatabaseBackendSchemaConstants.DynamicPrecisionStatic) {
1733: if (logger.isInfoEnabled())
1734: logger.info(Translate.get(
1735: "backend.schema.static.no.check",
1736: name));
1737: } else {
1738: if (logger.isInfoEnabled())
1739: logger
1740: .info(Translate
1741: .get("backend.check.schema.compatibility"));
1742: if (schema.isCompatibleSubset(metaSchema))
1743: logger
1744: .info(Translate
1745: .get(
1746: "backend.schema.compatible.for.login",
1747: connectionMananger
1748: .getLogin()));
1749: else {
1750: checked = false;
1751: logger
1752: .warn(Translate
1753: .get(
1754: "backend.schema.not.compatible.for.login",
1755: connectionMananger
1756: .getLogin()));
1757: }
1758: }
1759: }
1760: }
1761: } else { // Fetch schema from given connection
1762: try {
1763: schema = new DatabaseSQLMetaData(logger, c,
1764: dynamicPrecision, gatherSystemTables,
1765: schemaName)
1766: .createDatabaseSchema(virtualDatabaseName);
1767: } catch (SQLException e) {
1768: if (logger.isInfoEnabled())
1769: logger.info(
1770: "Failed to fetch schema from given connection "
1771: + c, e);
1772: return checkDatabaseSchema(null);
1773: }
1774: }
1775:
1776: setSchemaIsDirty(false, null);
1777:
1778: if (schema != null) {
1779: // Update stored procedure information
1780:
1781: if (defaultStoredProcedureSemantic != null) { // Apply default semantic to all stored procedures
1782: for (Iterator iterator = schema.getProcedures()
1783: .values().iterator(); iterator.hasNext();) {
1784: DatabaseProcedure proc = (DatabaseProcedure) iterator
1785: .next();
1786: if (proc.getSemantic() == null)
1787: proc
1788: .setSemantic(defaultStoredProcedureSemantic);
1789: }
1790: }
1791:
1792: // Apply specific stored procedure information
1793: for (Iterator iterator = storedProcedureSemantics.keySet()
1794: .iterator(); iterator.hasNext();) {
1795: String procedureKey = (String) iterator.next();
1796: DatabaseProcedureSemantic semantic = (DatabaseProcedureSemantic) storedProcedureSemantics
1797: .get(procedureKey);
1798: DatabaseProcedure proc = schema
1799: .getProcedure(procedureKey);
1800: if (proc != null) { // Update semantic information for this proc found in the schema
1801: proc.setSemantic(semantic);
1802: } else { // Add the stored procedure not found in the schema
1803: int parenthesis = procedureKey.indexOf("(");
1804: String procName = procedureKey.substring(0,
1805: parenthesis);
1806: proc = new DatabaseProcedure(procName, "",
1807: DatabaseProcedure.ProcedureResultUnknown);
1808: int paramCount = Integer.valueOf(
1809: procedureKey.substring(parenthesis + 1,
1810: procedureKey.indexOf(")",
1811: parenthesis))).intValue();
1812: ArrayList params = new ArrayList(paramCount);
1813: for (int i = 0; i < paramCount; i++)
1814: params.add("Param" + i);
1815: proc.setParameters(params);
1816: proc.setSemantic(semantic);
1817: schema.addProcedure(proc);
1818: }
1819: }
1820: }
1821: return checked;
1822: }
1823:
1824: /**
1825: * Returns the schema of this database.
1826: *
1827: * @return the schema of this database. Returns <code>null</code> if the
1828: * schema has not been set.
1829: * @see #setDatabaseSchema(DatabaseSchema, boolean)
1830: */
1831: public synchronized DatabaseSchema getDatabaseSchema() {
1832: if (schemaIsNeededByVdb && schemaIsDirty && !schemaIsStatic)
1833: refreshSchema(null);
1834: return schema;
1835: }
1836:
1837: /**
1838: * Get the Database static metadata from this backend using a connection from
1839: * the first available connection manager.
1840: *
1841: * @return Static metadata information
1842: */
1843: public MetadataContainer getDatabaseStaticMetadata() {
1844: AbstractConnectionManager connectionMananger;
1845: Iterator iter = connectionManagers.values().iterator();
1846: if (iter.hasNext()) {
1847: connectionMananger = (AbstractConnectionManager) iter
1848: .next();
1849: // Gather the static metadata from the first connection manager
1850: DatabaseBackendMetaData meta = new DatabaseBackendMetaData(
1851: connectionMananger, logger, dynamicPrecision,
1852: gatherSystemTables, virtualDatabaseName, schemaName);
1853: try {
1854: return meta.retrieveDatabaseMetadata();
1855: } catch (SQLException e) {
1856: return null;
1857: }
1858: } else
1859: return null;
1860: }
1861:
1862: /**
1863: * @return Returns the dynamicPrecision.
1864: */
1865: public int getDynamicPrecision() {
1866: return dynamicPrecision;
1867: }
1868:
1869: /**
1870: * Returns the schemaName value.
1871: *
1872: * @return Returns the schemaName.
1873: */
1874: public String getSchemaName() {
1875: return schemaName;
1876: }
1877:
1878: /**
1879: * Returns the default Stored Procedure Semantic value.
1880: *
1881: * @return Returns the defaultStoredProcedureSemantic.
1882: */
1883: // TODO nobody uses this method. Should be removed
1884: public DatabaseProcedureSemantic getDefaultStoredProcedureSemantic() {
1885: return defaultStoredProcedureSemantic;
1886: }
1887:
1888: /**
1889: * Set the default Stored Procedure semantic definition.
1890: *
1891: * @param defaultSemantic the default semantic
1892: */
1893: public void setDefaultStoredProcedureSemantic(
1894: DatabaseProcedureSemantic defaultSemantic) {
1895: this .defaultStoredProcedureSemantic = defaultSemantic;
1896: this .defaultStoredProcedureSemantic.setUseDefaultSemantic(true);
1897: }
1898:
1899: /**
1900: * Get all the names of tables of this database <b>NOTE</b>: The returned
1901: * collection will contain two entries per actual table: one with the table
1902: * name alone, the other prefixed by the schema name + ".".
1903: *
1904: * @see org.continuent.sequoia.common.sql.schema.DatabaseSchema#addTable(DatabaseTable)
1905: * @return <code>Collection</code> of <code>DatabaseTable</code>
1906: */
1907: public Collection getTables() {
1908: DatabaseSchema schemaPtr = getDatabaseSchema();
1909: if (schemaPtr == null)
1910: throw new NullPointerException(Translate
1911: .get("backend.schema.not.set"));
1912: return schemaPtr.getTables().values();
1913: }
1914:
1915: /**
1916: * Returns <code>true</code> if this backend has the given table in its
1917: * schema. The caller must ensure that the database schema has been defined,
1918: * using the {@link #setDatabaseSchema(DatabaseSchema, boolean)}or
1919: * {@link #checkDatabaseSchema()}
1920: *
1921: * @param table The table name to look for
1922: * @return <code>true</code> if tables is found in the schema
1923: */
1924: public boolean hasTable(String table) {
1925: DatabaseSchema schemaPtr = getDatabaseSchema();
1926: if (schemaPtr == null)
1927: throw new NullPointerException(Translate
1928: .get("backend.schema.not.set"));
1929:
1930: return schemaPtr.hasTable(table);
1931: }
1932:
1933: /**
1934: * Returns <code>true</code> if this backend has the given list of tables in
1935: * its schema. The caller must ensure that the database schema has been
1936: * defined, using the {@link #setDatabaseSchema(DatabaseSchema, boolean)}or
1937: * {@link #checkDatabaseSchema()}methods.
1938: *
1939: * @param tables the list of table names (<code>Collection</code> of
1940: * <code>String</code>) to look for
1941: * @return <code>true</code> if all the tables are found
1942: */
1943: public boolean hasTables(Collection tables) {
1944: DatabaseSchema schemaPtr = getDatabaseSchema();
1945: if (schemaPtr == null)
1946: throw new NullPointerException(Translate
1947: .get("backend.schema.not.set"));
1948:
1949: if (tables == null)
1950: throw new IllegalArgumentException(Translate
1951: .get("backend.null.tables"));
1952:
1953: for (Iterator iter = tables.iterator(); iter.hasNext();) {
1954: if (!schemaPtr.hasTable((String) iter.next()))
1955: return false;
1956: }
1957: return true;
1958: }
1959:
1960: /**
1961: * Returns <code>true</code> if this backend has the given stored procedure
1962: * in its schema. The caller must ensure that the database schema has been
1963: * defined, using the {@link #setDatabaseSchema(DatabaseSchema, boolean)}or
1964: * {@link #checkDatabaseSchema()}
1965: *
1966: * @param procedureName The stored procedure name to look for
1967: * @param nbOfParameters number of parameters of the stored procecdure
1968: * @return <code>true</code> if the procedure has been found
1969: */
1970: public boolean hasStoredProcedure(String procedureName,
1971: int nbOfParameters) {
1972: DatabaseSchema schemaPtr = getDatabaseSchema();
1973: if (schemaPtr == null)
1974: throw new NullPointerException(Translate
1975: .get("backend.schema.not.set"));
1976:
1977: return schemaPtr.hasProcedure(procedureName, nbOfParameters);
1978: }
1979:
1980: /**
1981: * Returns the gatherSystemTables value.
1982: *
1983: * @return Returns the gatherSystemTables.
1984: */
1985: public boolean isGatherSystemTables() {
1986: return gatherSystemTables;
1987: }
1988:
1989: /**
1990: * Returns the schemaIsDirty value.
1991: *
1992: * @return Returns true if the backend database schema is dirty and needs a
1993: * refresh.
1994: */
1995: // TODO nobody uses this method. Should be removed
1996: public boolean isSchemaDirty() {
1997: return schemaIsDirty;
1998: }
1999:
2000: /**
2001: * @return Returns the schemaIsStatic.
2002: */
2003: public boolean isSchemaStatic() {
2004: return schemaIsStatic;
2005: }
2006:
2007: /**
2008: * Erase the current schema and force a re-fetch of all the meta data
2009: *
2010: * @param c optional connection from which the schema should be fetched (null
2011: * if not applicable)
2012: */
2013: private synchronized void refreshSchema(Connection c) {
2014: if (isSchemaStatic())
2015: return;
2016: DatabaseSchema oldSchema = schema;
2017: setDatabaseSchema(null, isSchemaStatic());
2018: checkDatabaseSchema(c); // set dirty to false as well
2019: // Schema is null if we failed to refresh it
2020: if ((schema != null) && (oldSchema != null)) {
2021: schema.setLocks(oldSchema);
2022: }
2023: }
2024:
2025: /**
2026: * Sets the database schema.
2027: *
2028: * @param databaseSchema the schema to set
2029: * @param isStatic <code>true</code> if the schema should be static
2030: * @see #getDatabaseSchema()
2031: */
2032: public synchronized void setDatabaseSchema(
2033: DatabaseSchema databaseSchema, boolean isStatic) {
2034: if (schema == null) {
2035: schemaIsStatic = isStatic;
2036: schema = databaseSchema;
2037: } else {
2038: if (!isStatic)
2039: schema = databaseSchema;
2040: }
2041: }
2042:
2043: /**
2044: * Set the amount of information that must be gathered when fetching database
2045: * schema information.
2046: *
2047: * @param dynamicPrecision The dynamicPrecision to set.
2048: * @param gatherSystemTables True if we must gather system tables
2049: * @param schemaName Schema name to use to gather tables
2050: */
2051: public void setDynamicPrecision(int dynamicPrecision,
2052: boolean gatherSystemTables, String schemaName) {
2053: this .dynamicPrecision = dynamicPrecision;
2054: this .gatherSystemTables = gatherSystemTables;
2055: this .schemaName = schemaName;
2056: }
2057:
2058: /**
2059: * Sets the schemaIsDirty value if the backend schema needs to be refreshed.
2060: *
2061: * @param schemaIsDirty The schemaIsDirty to set.
2062: * @param request optional request information used to retrieve the connection
2063: * from which the schema should be fetched (null if not applicable)
2064: */
2065: public void setSchemaIsDirty(boolean schemaIsDirty,
2066: AbstractRequest request) {
2067: if (request == null) {
2068: this .schemaIsDirty = schemaIsDirty;
2069: return;
2070: }
2071:
2072: // Try to retrieve the connection corresponding to the persistent connection
2073: // or transaction if applicable
2074: Connection c = null;
2075: PooledConnection pc = null;
2076: if (request.isPersistentConnection()) {
2077: AbstractConnectionManager cm = getConnectionManager(request
2078: .getLogin());
2079: if (cm != null)
2080: try {
2081: pc = cm.retrieveConnectionInAutoCommit(request);
2082: if (pc != null)
2083: c = pc.getConnection();
2084: } catch (UnreachableBackendException ignore) {
2085: }
2086: } else if (!request.isAutoCommit()) {
2087: AbstractConnectionManager cm = getConnectionManager(request
2088: .getLogin());
2089: if (cm != null)
2090: pc = cm.retrieveConnectionForTransaction(request
2091: .getTransactionId());
2092: if (pc != null)
2093: c = pc.getConnection();
2094: }
2095:
2096: if (c == null)
2097: this .schemaIsDirty = schemaIsDirty;
2098: else
2099: // refresh schema right away
2100: refreshSchema(c);
2101: }
2102:
2103: /**
2104: * Sets the schemaIsNeededByVdb value.
2105: *
2106: * @param schemaIsNeededByVdb The schemaIsNeededByVdb to set.
2107: */
2108: public void setSchemaIsNeededByVdb(boolean schemaIsNeededByVdb) {
2109: this .schemaIsNeededByVdb = schemaIsNeededByVdb;
2110: }
2111:
2112: /**
2113: * Update the DatabaseBackend schema definition according to the successful
2114: * execution of the provided request. Note that the schema is only updated it
2115: * the provided request is a DDL statement.
2116: * <p>
2117: * TODO: a full refresh is forced on CREATE request to be sure to properly
2118: * handle foreign keys (see SEQUOIA-581). An improvement would be not to
2119: * refresh the whole schema on each create request - just add the new table
2120: * and fetch its dependencies (foreign keys) by reloading exported keys of
2121: * existing tables (see SEQUOIA-xxx). Similar optimization may be done on
2122: * ALTER requests but it would require to be able to parse the request -
2123: * currently we just force a full schema refresh. Note that concerning DROP
2124: * request, we remove the table from dependending tables of other tables in
2125: * addition to the removal of the table from the schema (given t1 and t2
2126: * tables, if t2 has a fk referencing t1, the table t1 will have the table t2
2127: * in its dependending tables). In case of rollback, these modification will
2128: * be cancelled because the schema will be refreshed.
2129: *
2130: * @param request the request that possibly updates the schema
2131: */
2132: public void updateDatabaseBackendSchema(AbstractWriteRequest request) {
2133: if (!request.altersSomething()
2134: || !request.altersDatabaseSchema())
2135: return;
2136:
2137: // Update schema
2138: if (schemaIsNeededByVdb) {
2139: if (request.isCreate()) { // Add the table to the schema
2140: DatabaseSchema dbs = getDatabaseSchema();
2141: if (dbs != null) {
2142: CreateRequest createRequest = (CreateRequest) request;
2143: if (createRequest.altersDatabaseSchema()) {
2144: if (createRequest.getDatabaseTable() != null) {
2145: DatabaseTable t = new DatabaseTable(
2146: createRequest.getDatabaseTable());
2147: dbs.addTable(t);
2148: if (logger.isDebugEnabled())
2149: logger
2150: .debug("Added table '"
2151: + t.getName()
2152: + "' to backend database schema");
2153: /*
2154: * Set the inverse relationships for the depending tables
2155: */
2156: if (t.getDependingTables() != null) {
2157: for (Iterator i = t
2158: .getDependingTables()
2159: .iterator(); i.hasNext();) {
2160: String rtn = (String) i.next();
2161: DatabaseTable rt = dbs
2162: .getTable(rtn);
2163: if (rt != null)
2164: rt.addDependingTable(t
2165: .getName());
2166:
2167: }
2168: }
2169: }
2170: }
2171: }
2172: // A full refresh is forced on CREATE request (see comment above)
2173: setSchemaIsDirty(true, request);
2174: } else if (request.isDrop()) { // Delete the table(s) from the schema
2175: DatabaseSchema dbs = getDatabaseSchema();
2176: if (dbs != null) {
2177: // DatabaseTable t = dbs.getTable(request.getTableName());
2178: SortedSet tablesToRemove = ((DropRequest) request)
2179: .getTablesToDrop();
2180: if (tablesToRemove != null)
2181: for (Iterator iter = tablesToRemove.iterator(); iter
2182: .hasNext();) {
2183: String tableToRemove = (String) iter.next();
2184: DatabaseTable t = dbs
2185: .getTable(tableToRemove);
2186: if (t != null) {
2187: // Remove table from schema
2188: dbs.removeTable(t);
2189: if (logger.isDebugEnabled())
2190: logger
2191: .debug("Removed table '"
2192: + t.getName()
2193: + "' from backend database schema");
2194:
2195: // Remove table from depending tables
2196: if (logger.isDebugEnabled())
2197: logger
2198: .debug("Removing table '"
2199: + t.getName()
2200: + "' from dependending tables in request manager database schema");
2201: getDatabaseSchema()
2202: .removeTableFromDependingTables(
2203: t);
2204: }
2205: }
2206: return;
2207: }
2208: }
2209: // Unsupported force re-fetch from db
2210: setSchemaIsDirty(true, request);
2211: } else
2212: // Unsupported force re-fetch from db
2213: setSchemaIsDirty(true, request);
2214: }
2215:
2216: //
2217: // Rewriting rules
2218: //
2219:
2220: /**
2221: * Add a <code>AbstractRewritingRule</code> at the end of the rule list.
2222: *
2223: * @param rule a AbstractRewritingRule
2224: */
2225: public void addRewritingRule(AbstractRewritingRule rule) {
2226: if (rewritingRules == null)
2227: rewritingRules = new ArrayList();
2228: if (logger.isDebugEnabled())
2229: logger.debug(Translate.get("backend.rewriting.rule.add",
2230: new String[] { rule.getQueryPattern(),
2231: rule.getRewrite() }));
2232: rewritingRules.add(rule);
2233: }
2234:
2235: /**
2236: * Rewrite the current query according to the rewriting rules.
2237: *
2238: * @param sqlQuery request to rewrite
2239: * @return the rewritten SQL query according to rewriting rules.
2240: */
2241: public String rewriteQuery(String sqlQuery) {
2242: if (rewritingRules == null)
2243: return sqlQuery;
2244: int size = rewritingRules.size();
2245: for (int i = 0; i < size; i++) {
2246: AbstractRewritingRule rule = (AbstractRewritingRule) rewritingRules
2247: .get(i);
2248: sqlQuery = rule.rewrite(sqlQuery);
2249: if (rule.hasMatched()) { // Rule matched, query rewriten
2250: if (logger.isDebugEnabled())
2251: logger.debug(Translate.get(
2252: "backend.rewriting.query", sqlQuery));
2253: if (rule.isStopOnMatch())
2254: break; // Ok, stop here.
2255: }
2256: }
2257: return sqlQuery;
2258: }
2259:
2260: /*
2261: * Debug/Monitoring
2262: */
2263:
2264: /**
2265: * Adds a pending request to this backend.<br />
2266: * Method is synchronized on <code>pendingRequests</code> field.
2267: *
2268: * @param request the request to add
2269: * @see #pendingRequests
2270: */
2271: public void addPendingReadRequest(AbstractRequest request) {
2272: synchronized (pendingRequests) {
2273: totalRequest++;
2274: totalReadRequest++;
2275: pendingRequests.add(request);
2276: }
2277: }
2278:
2279: /**
2280: * Adds a pending request to this backend.<br />
2281: * Method is synchronized on <code>pendingTasks</code> field.
2282: *
2283: * @param task the task to add
2284: * @see #pendingTasks
2285: */
2286: public void addPendingTask(AbstractTask task) {
2287: synchronized (pendingTasks) {
2288: totalTasks++;
2289: pendingTasks.add(task);
2290: }
2291: }
2292:
2293: /**
2294: * Adds a pending write request to this backend.<br />
2295: * Method is synchronized on <code>pendingRequests</code> field.
2296: *
2297: * @param request the request to add
2298: * @see #pendingRequests
2299: */
2300: public void addPendingWriteRequest(AbstractRequest request) {
2301: synchronized (pendingRequests) {
2302: totalRequest++;
2303: totalWriteRequest++;
2304: pendingRequests.add(request);
2305: }
2306: }
2307:
2308: /**
2309: * @return Returns the activeTransactions.
2310: */
2311: public ArrayList getActiveTransactions() {
2312: return activeTransactions;
2313: }
2314:
2315: /**
2316: * Get data about this backend. Format is:
2317: *
2318: * <pre>
2319: * data[0] = this.name;
2320: * data[1] = this.driverClassName;
2321: * data[2] = this.url;
2322: * data[3] = String.valueOf(this.activeTransactions.size());
2323: * data[4] = String.valueOf(this.pendingRequests.size());
2324: * data[5] = String.valueOf(this.isReadEnabled());
2325: * data[6] = String.valueOf(this.isWriteEnabled());
2326: * data[7] = String.valueOf(this.isInitialized());
2327: * data[8] = String.valueOf(this.schemaIsStatic);
2328: * data[9] = String.valueOf(this.connectionManagers.size());
2329: * data[10] = String.valueOf(getTotalActiveConnections());
2330: * data[11] = String.valueOf(totalRequest);
2331: * data[12] = String.valueOf(totalTransactions);
2332: * data[13] = lastKnownCheckpoint;
2333: *</pre>
2334: *
2335: * @return an array of strings
2336: */
2337: public String[] getBackendData() {
2338: String[] data = new String[14];
2339: data[0] = this .name;
2340: data[1] = this .driverClassName;
2341: data[2] = this .url;
2342: data[3] = String.valueOf(this .activeTransactions.size());
2343: data[4] = String.valueOf(this .pendingRequests.size());
2344: data[5] = String.valueOf(this .isReadEnabled());
2345: data[6] = String.valueOf(this .isWriteEnabled());
2346: try {
2347: data[7] = String.valueOf(this .isInitialized());
2348: } catch (Exception e) {
2349: data[7] = "unknown";
2350: }
2351: data[8] = String.valueOf(this .schemaIsStatic);
2352:
2353: data[9] = String.valueOf(this .connectionManagers.size());
2354: data[10] = String.valueOf(getTotalActiveConnections());
2355: data[11] = String.valueOf(totalRequest);
2356: data[12] = String.valueOf(totalTransactions);
2357: if (lastKnownCheckpoint == null
2358: || lastKnownCheckpoint.equalsIgnoreCase(""))
2359: data[13] = "<unknown>";
2360: else
2361: data[13] = lastKnownCheckpoint;
2362: return data;
2363: }
2364:
2365: /**
2366: * Get the statistics of the backend.
2367: *
2368: * @return a BackendStatistics
2369: */
2370: public BackendStatistics getBackendStats() {
2371: BackendStatistics stats = new BackendStatistics();
2372: stats.setBackendName(name);
2373: stats.setDriverClassName(driverClassName);
2374: stats.setUrl(url);
2375: stats.setNumberOfActiveTransactions(activeTransactions.size());
2376: stats.setNumberOfPendingRequests(pendingRequests.size());
2377: stats.setNumberOfPersistentConnections(persistentConnections
2378: .size());
2379: stats.setReadEnabled(isReadEnabled());
2380: stats.setWriteEnabled(isWriteEnabled());
2381: String initializationStatus = "<unknown>";
2382: try {
2383: initializationStatus = String.valueOf(this .isInitialized());
2384: } catch (Exception e) {
2385: }
2386: stats.setInitializationStatus(initializationStatus);
2387: stats.setSchemaStatic(schemaIsStatic);
2388: stats.setNumberOfConnectionManagers(connectionManagers.size());
2389: stats
2390: .setNumberOfTotalActiveConnections(getTotalActiveConnections());
2391: stats.setNumberOfTotalRequests(totalRequest);
2392: stats.setNumberOfTotalTransactions(totalTransactions);
2393: if (lastKnownCheckpoint == null
2394: || lastKnownCheckpoint.equalsIgnoreCase(""))
2395: stats.setLastKnownCheckpoint("<unknown>");
2396: else
2397: stats.setLastKnownCheckpoint(lastKnownCheckpoint);
2398: return stats;
2399: }
2400:
2401: /**
2402: * Returns the list of pending requests for this backend.
2403: *
2404: * @return <code>Vector</code> of <code>AbstractRequests</code> or
2405: * <code>AbstractTask</code> objects
2406: */
2407: public Vector getPendingRequests() {
2408: return pendingRequests;
2409: }
2410:
2411: /**
2412: * Returns the list of pending requests for this backend.
2413: *
2414: * @param count number of requests to retrieve, if 0, return all.
2415: * @param fromFirst count the request from first if true, or from last if
2416: * false
2417: * @param clone should clone the pending request if true, block it if false
2418: * @return <code>ArrayList</code> of <code>String</code> description of
2419: * each request.
2420: */
2421: public ArrayList getPendingRequestsDescription(int count,
2422: boolean fromFirst, boolean clone) {
2423: int size = pendingRequests.size();
2424: int limit = (count == 0 || count > size) ? size : Math.min(
2425: size, count);
2426: ArrayList list = new ArrayList(limit);
2427: int start = (fromFirst) ? 0 : Math.min(limit - count, 0);
2428: if (!clone) {
2429: synchronized (pendingRequests) {
2430: for (int i = start; i < limit; i++)
2431: list.add(pendingRequests.get(i).toString());
2432: }
2433: return list;
2434: } else {
2435: Vector cloneVector = (Vector) pendingRequests.clone();
2436: for (int i = start; i < limit; i++)
2437: list.add(cloneVector.get(i).toString());
2438: return list;
2439: }
2440: }
2441:
2442: /**
2443: * Get the total number of active connections for this backend
2444: *
2445: * @return number of active connections for all
2446: * <code>AbstractConnectionManager</code> connected to this backend
2447: */
2448: public long getTotalActiveConnections() {
2449: int activeConnections = 0;
2450: Iterator iter = connectionManagers.keySet().iterator();
2451: while (iter.hasNext())
2452: activeConnections += ((AbstractConnectionManager) connectionManagers
2453: .get(iter.next())).getCurrentNumberOfConnections();
2454: return activeConnections;
2455: }
2456:
2457: /**
2458: * Returns the total number of transactions executed by this backend.
2459: *
2460: * @return Total number of transactions.
2461: */
2462: public int getTotalTransactions() {
2463: return totalTransactions;
2464: }
2465:
2466: /**
2467: * Returns the total number of read requests executed by this backend.
2468: *
2469: * @return Returns the totalReadRequest.
2470: */
2471: public int getTotalReadRequest() {
2472: return totalReadRequest;
2473: }
2474:
2475: /**
2476: * Returns the total number of write requests executed by this backend.
2477: *
2478: * @return Returns the totalWriteRequest.
2479: */
2480: public int getTotalWriteRequest() {
2481: return totalWriteRequest;
2482: }
2483:
2484: /**
2485: * Returns the total number of requests executed by this backend.
2486: *
2487: * @return Returns the totalRequest.
2488: */
2489: public int getTotalRequest() {
2490: return totalRequest;
2491: }
2492:
2493: /**
2494: * Removes a pending request from this backend. Note that the underlying
2495: * vector is synchronized.
2496: *
2497: * @param request the request to remove
2498: * @return <code>true</code> if the request has been found and removed
2499: */
2500: public boolean removePendingRequest(AbstractRequest request) {
2501: return pendingRequests.remove(request);
2502: }
2503:
2504: /**
2505: * Removes a pending task from this backend. Note that the underlying vector
2506: * is synchronized.
2507: *
2508: * @param task the task to remove
2509: * @return <code>true</code> if the task has been found and removed
2510: */
2511: public boolean removePendingTask(AbstractTask task) {
2512: return pendingTasks.remove(task);
2513: }
2514:
2515: //
2516: // Getters/Setters
2517: //
2518:
2519: /**
2520: * Returns the databaseProductName value.
2521: *
2522: * @return Returns the databaseProductName.
2523: */
2524: public String getDatabaseProductName() {
2525: return driverCompliance.getDatabaseProductName();
2526: }
2527:
2528: /**
2529: * @return the driver compliance to Sequoia requirements.
2530: */
2531: public DriverCompliance getDriverCompliance() {
2532: return driverCompliance;
2533: }
2534:
2535: /**
2536: * Returns the driver path.
2537: *
2538: * @return the driver path
2539: */
2540: public String getDriverPath() {
2541: return driverPath;
2542: }
2543:
2544: /**
2545: * Returns the database native JDBC driver class name.
2546: *
2547: * @return the driver class name
2548: */
2549: public String getDriverClassName() {
2550: return driverClassName;
2551: }
2552:
2553: /**
2554: * Returns the backend logical name.
2555: *
2556: * @return the backend logical name
2557: */
2558: public String getName() {
2559: return name;
2560: }
2561:
2562: /**
2563: * Returns the full qualified name of this backend.
2564: *
2565: * @return a String representing the full qualified name of this backend.
2566: */
2567: private String getQualifiedName() {
2568: return virtualDatabaseName + "." + name;
2569: }
2570:
2571: /**
2572: * Return the sql short form length to use when reporting an error.
2573: *
2574: * @return sql short form length
2575: * @see org.continuent.sequoia.controller.requests.AbstractRequest#getSqlShortForm(int)
2576: */
2577: public int getSqlShortFormLength() {
2578: return sqlShortFormLength;
2579: }
2580:
2581: /**
2582: * Returns the JDBC URL used to access the database.
2583: *
2584: * @return a JDBC URL
2585: */
2586: public String getURL() {
2587: return url;
2588: }
2589:
2590: /**
2591: * Returns the taskQueues value.
2592: *
2593: * @return Returns the taskQueues.
2594: */
2595: public BackendTaskQueues getTaskQueues() {
2596: return taskQueues;
2597: }
2598:
2599: /**
2600: * Sets the taskQueues value.
2601: *
2602: * @param taskQueues The taskQueues to set.
2603: */
2604: public void setTaskQueues(BackendTaskQueues taskQueues) {
2605: this .taskQueues = taskQueues;
2606: }
2607:
2608: /**
2609: * Returns the virtual database name this backend belongs to.
2610: *
2611: * @return Returns the virtual database name.
2612: */
2613: public String getVirtualDatabaseName() {
2614: return virtualDatabaseName;
2615: }
2616:
2617: //
2618: // XML mapping
2619: //
2620:
2621: /**
2622: * Get xml information about this backend.
2623: *
2624: * @return xml formatted information on this database backend.
2625: */
2626: public synchronized String getXml() {
2627: // escape & XML entity and replace it by & so that the
2628: // url attribute is XML compliant
2629: String escapedUrl = url.replaceAll("&", "&");
2630:
2631: StringBuffer info = new StringBuffer();
2632: info.append("<" + DatabasesXmlTags.ELT_DatabaseBackend + " "
2633: + DatabasesXmlTags.ATT_name + "=\"" + name + "\" "
2634: + DatabasesXmlTags.ATT_driver + "=\"" + driverClassName
2635: + "\" " + DatabasesXmlTags.ATT_url + "=\"" + escapedUrl
2636: + "\" " + DatabasesXmlTags.ATT_connectionTestStatement
2637: + "=\"" + connectionTestStatement + "\">");
2638:
2639: boolean expandSchema = this .schema != null
2640: && dynamicPrecision == DatabaseBackendSchemaConstants.DynamicPrecisionStatic;
2641:
2642: info.append(getSchemaXml(expandSchema));
2643:
2644: if (rewritingRules != null) {
2645: int size = rewritingRules.size();
2646: for (int i = 0; i < size; i++)
2647: info.append(((AbstractRewritingRule) rewritingRules
2648: .get(i)).getXml());
2649: }
2650: if (connectionManagers != null) {
2651: if (connectionManagers.isEmpty() == false) {
2652: AbstractConnectionManager connectionManager;
2653: Iterator iter = connectionManagers.values().iterator();
2654: while (iter.hasNext()) {
2655: connectionManager = (AbstractConnectionManager) iter
2656: .next();
2657: info.append(connectionManager.getXml());
2658: }
2659: }
2660: }
2661: info.append("</" + DatabasesXmlTags.ELT_DatabaseBackend + ">");
2662: return info.toString();
2663: }
2664:
2665: /**
2666: * The getXml() method does not return the schema if it is not static anymore,
2667: * to avoid confusion between static and dynamic schema. This method returns a
2668: * static view of the schema, whatever the dynamic precision is.
2669: *
2670: * @param expandSchema if we should force the schema to be expanded. This is
2671: * needed as the default getXml should call this method.
2672: * @return an xml formatted string
2673: */
2674: public String getSchemaXml(boolean expandSchema) {
2675: StringBuffer info = new StringBuffer();
2676: info.append("<"
2677: + DatabasesXmlTags.ELT_DatabaseSchema
2678: + " "
2679: + DatabasesXmlTags.ATT_dynamicPrecision
2680: + "=\""
2681: + DatabaseBackendSchemaConstants
2682: .getDynamicSchemaLevel(dynamicPrecision)
2683: + "\" " + DatabasesXmlTags.ATT_gatherSystemTables
2684: + "=\"" + (gatherSystemTables ? "true" : "false")
2685: + "\">");
2686: synchronized (this ) {
2687: if (expandSchema && (schema != null))
2688: info.append(schema.getXml());
2689: }
2690: info.append("</" + DatabasesXmlTags.ELT_DatabaseSchema + ">");
2691: return info.toString();
2692: }
2693:
2694: /**
2695: * Sets the sqlShortFormLength value.
2696: *
2697: * @param sqlShortFormLength The sqlShortFormLength to set.
2698: */
2699: public void setSqlShortFormLength(int sqlShortFormLength) {
2700: this .sqlShortFormLength = sqlShortFormLength;
2701: }
2702:
2703: /**
2704: * String description
2705: *
2706: * @return a string description of the backend.
2707: */
2708: public String toString() {
2709: return "Backend: Name[" + this .name + "] State[" + this .state
2710: + "] JDBCConnected[" + isJDBCConnected()
2711: + "] ActiveTransactions[" + activeTransactions.size()
2712: + "] PersistentConnections[" + persistentConnections
2713: + "] PendingRequests[" + pendingRequests.size() + "]";
2714: }
2715:
2716: //
2717: // Worker threads management
2718: //
2719:
2720: /**
2721: * Return the first available BackendWorkerThread. This should only be used
2722: * for notification of request abort.
2723: *
2724: * @return a BackendWorkerThread or null if none exists
2725: */
2726: public BackendWorkerThread getBackendWorkerThreadForNotification() {
2727: if ((workerThreads == null) || workerThreads.isEmpty())
2728: return null;
2729: return (BackendWorkerThread) workerThreads.get(0);
2730: }
2731:
2732: /**
2733: * Returns the nbOfWorkerThreads value.
2734: *
2735: * @return Returns the nbOfWorkerThreads.
2736: */
2737: public int getNbOfWorkerThreads() {
2738: return nbOfWorkerThreads;
2739: }
2740:
2741: /**
2742: * Sets the nbOfWorkerThreads value.
2743: *
2744: * @param nbOfWorkerThreads The nbOfWorkerThreads to set.
2745: */
2746: // TODO nobody uses this method. Should be removed
2747: public void setNbOfWorkerThreads(int nbOfWorkerThreads) {
2748: this .nbOfWorkerThreads = nbOfWorkerThreads;
2749: }
2750:
2751: /**
2752: * Start a new Deadlock Detection Thread (throws a RuntimeException if called
2753: * twice without stopping the thread before the second call).
2754: *
2755: * @param vdb the virtual database the backend is attached to
2756: */
2757: public void startDeadlockDetectionThread(VirtualDatabase vdb) {
2758: taskQueues.startDeadlockDetectionThread(vdb);
2759:
2760: }
2761:
2762: /**
2763: * Start the BackendWorkerThreads for this backend.
2764: *
2765: * @param loadBalancer load balancer requesting the activation
2766: */
2767: public void startWorkerThreads(AbstractLoadBalancer loadBalancer) {
2768: taskQueues.setAllowTasksToBePosted(true);
2769: synchronized (workerThreadSync) {
2770: if (logger.isDebugEnabled())
2771: logger
2772: .debug(Translate
2773: .get(
2774: "loadbalancer.backend.workerthread.starting",
2775: new String[] {
2776: String
2777: .valueOf(nbOfWorkerThreads),
2778: name }));
2779:
2780: if (workerThreads == null)
2781: workerThreads = new ArrayList();
2782: // Create worker threads
2783: for (int i = 0; i < nbOfWorkerThreads; i++) {
2784: BackendWorkerThread thread = new BackendWorkerThread(
2785: this , loadBalancer);
2786: workerThreads.add(thread);
2787: // Dedicate the first thread to commit/rollback operations
2788: thread.setPlayCommitRollbackOnly(i == 0);
2789: thread.start();
2790: }
2791: }
2792: }
2793:
2794: /**
2795: * Terminate all worker threads.
2796: */
2797: public void terminateWorkerThreads() {
2798: terminateWorkerThreads(true);
2799: }
2800:
2801: /**
2802: * Terminate all worker threads.
2803: *
2804: * @param wait if true waits for worker threads to terminate before returning
2805: */
2806: public void terminateWorkerThreads(boolean wait) {
2807: synchronized (workerThreadSync) {
2808: if (workerThreads == null)
2809: return;
2810:
2811: // Terminate worker threads by posting a kill task for each of them
2812: int size = workerThreads.size();
2813:
2814: if (logger.isDebugEnabled())
2815: logger.debug(Translate.get(
2816: "loadbalancer.backend.workerthread.stopping",
2817: new String[] { String.valueOf(size), name }));
2818:
2819: for (int i = 0; i < size; i++) {
2820: KillThreadTask killBlockingThreadTask = new KillThreadTask(
2821: 1, 1);
2822: taskQueues
2823: .addTaskToBackendTotalOrderQueue(killBlockingThreadTask);
2824: }
2825:
2826: if (wait)
2827: // Wait for thread termination
2828: for (Iterator iter = workerThreads.iterator(); iter
2829: .hasNext();) {
2830: BackendWorkerThread thread = (BackendWorkerThread) iter
2831: .next();
2832: if (thread != Thread.currentThread()) { // Do not try to wait for self if we are the one who started the
2833: // disabling.
2834: try {
2835: thread.join();
2836: } catch (InterruptedException ignore) {
2837: }
2838: }
2839: }
2840:
2841: // Remove the threads from the list
2842: workerThreads.clear();
2843:
2844: // Cleanup what could remain in the queues
2845: taskQueues.abortRemainingRequests();
2846: }
2847: }
2848:
2849: /**
2850: * Terminate the Deadlock Detection Thread. Throws a RuntimeException is the
2851: * thread was already stopped (or not started).
2852: */
2853: public void terminateDeadlockDetectionThread() {
2854: taskQueues.terminateDeadlockDetectionThread();
2855: }
2856:
2857: /**
2858: * Convert a <code><DatabaseBackend>List</code> to a
2859: * <code><BackendInfo>List</code>.
2860: * <p>
2861: * The DatabaseBackend objects cannot be serialized because they are used as
2862: * MBean and notification emitters, so we want to extract the BackendInfo
2863: * (which are <code>Serializable</code>) out of them.
2864: * </p>
2865: * <p>
2866: * <strong>This method does not keep the XML configuration of the BackendInfo
2867: * objects. Subsequent calls to getXml() on BackendInfos returned by this
2868: * method will always return <code>null</code>
2869: * </p>
2870: *
2871: * @param backends a <code>List</code> of <code>DatabaseBackends</code>
2872: * @return a <code>List</code> of <code>BackendInfo</code> (possibly empty
2873: * if the list of backends was <code>null</code>
2874: * @see BackendInfo#toDatabaseBackends(List)
2875: */
2876: public static/* <BackendInfo> */List toBackendInfos(
2877: /* <DatabaseBackend> */List backends) {
2878: if (backends == null) {
2879: return new ArrayList();
2880: }
2881: List backendInfos = new ArrayList(backends.size());
2882: for (Iterator iter = backends.iterator(); iter.hasNext();) {
2883: DatabaseBackend backend = (DatabaseBackend) iter.next();
2884: BackendInfo backendInfo = new BackendInfo(backend);
2885: // we do not keep the XML configuration in the BackendInfo
2886: // FIXME I did that to mimic current behavior but I don't see the
2887: // reason why (maybe the size of XML to transfer on the wire?)
2888: backendInfo.setXml(null);
2889: backendInfos.add(backendInfo);
2890: }
2891: return backendInfos;
2892: }
2893:
2894: /**
2895: * Check if the given vdb user is a valid user for this backend.
2896: *
2897: * @param vdbUser to be checked.
2898: * @return true if the vdb user is valid, false otherwise.
2899: */
2900: public boolean isValidBackendUser(VirtualDatabaseUser vdbUser) {
2901: Connection conn = null;
2902: try {
2903: conn = DriverManager.getConnection(url, vdbUser.getLogin(),
2904: vdbUser.getPassword(), driverPath, driverClassName);
2905: return true;
2906: } catch (SQLException ignore) {
2907: if (logger.isDebugEnabled()) {
2908: logger.debug("Failed to get connection using vdb user "
2909: + vdbUser.getLogin() + " as real user", ignore);
2910: }
2911: return false;
2912: } finally {
2913: if (conn != null) {
2914: try {
2915: conn.close();
2916: } catch (SQLException ignore) {
2917: // Silently ignore
2918: }
2919: }
2920: }
2921: }
2922:
2923: /**
2924: * Get the log4j logger for this backend
2925: *
2926: * @return the logger for this backend
2927: */
2928: public Trace getLogger() {
2929: return logger;
2930: }
2931: }
|