0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Nicolas Modrzyk, Jean-Bernard van Zuylen, Damian Arregui.
0023: * Refactored by Marc Herbert to remove the use of Java serialization.
0024: */package org.continuent.sequoia.controller.virtualdatabase;
0025:
0026: import java.io.EOFException;
0027: import java.io.IOException;
0028: import java.io.Serializable;
0029: import java.net.SocketException;
0030: import java.sql.SQLException;
0031: import java.sql.SQLWarning;
0032: import java.util.ArrayList;
0033: import java.util.HashMap;
0034: import java.util.Iterator;
0035: import java.util.List;
0036:
0037: import org.continuent.sequoia.common.exceptions.BadJDBCApiUsageException;
0038: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0039: import org.continuent.sequoia.common.exceptions.NoMoreControllerException;
0040: import org.continuent.sequoia.common.exceptions.NotImplementedException;
0041: import org.continuent.sequoia.common.exceptions.ProtocolException;
0042: import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException;
0043: import org.continuent.sequoia.common.exceptions.driver.protocol.BackendDriverException;
0044: import org.continuent.sequoia.common.exceptions.driver.protocol.ControllerCoreException;
0045: import org.continuent.sequoia.common.i18n.Translate;
0046: import org.continuent.sequoia.common.log.Trace;
0047: import org.continuent.sequoia.common.protocol.Commands;
0048: import org.continuent.sequoia.common.protocol.SQLDataSerialization;
0049: import org.continuent.sequoia.common.protocol.TypeTag;
0050: import org.continuent.sequoia.common.protocol.SQLDataSerialization.Serializer;
0051: import org.continuent.sequoia.common.sql.Request;
0052: import org.continuent.sequoia.common.sql.RequestWithResultSetParameters;
0053: import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
0054: import org.continuent.sequoia.common.sql.metadata.MetadataDescription;
0055: import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0056: import org.continuent.sequoia.common.stream.DriverBufferedInputStream;
0057: import org.continuent.sequoia.common.stream.DriverBufferedOutputStream;
0058: import org.continuent.sequoia.common.users.VirtualDatabaseUser;
0059: import org.continuent.sequoia.common.util.Constants;
0060: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0061: import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0062: import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0063: import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0064: import org.continuent.sequoia.controller.core.Controller;
0065: import org.continuent.sequoia.controller.core.ControllerConstants;
0066: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0067: import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
0068: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0069: import org.continuent.sequoia.controller.requests.AbstractRequest;
0070: import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0071: import org.continuent.sequoia.controller.requests.RequestFactory;
0072: import org.continuent.sequoia.controller.requests.SelectRequest;
0073: import org.continuent.sequoia.controller.requests.StoredProcedure;
0074: import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
0075: import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0076: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0077: import org.continuent.sequoia.driver.Connection;
0078:
0079: /**
0080: * This class handles a connection with a Sequoia driver.
0081: *
0082: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0083: * @author <a href="mailto:Nicolas.Modrzyk@inria.fr">Nicolas Modrzyk </a>
0084: * @author <a href="mailto:Marc.Herbert@emicnetworks.com">Marc Herbert </a>
0085: * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0086: * </a>
0087: * @author <a href="mailto:damian.arregui@continuent.com">Damian Arregui
0088: * @version 2.0
0089: */
0090: public class VirtualDatabaseWorkerThread extends Thread {
0091: //
0092: // How the code is organized?
0093: //
0094: // 1. Member variables
0095: // 2. Constructor(s)
0096: // 3. Request management
0097: // 4. Getter/Setters
0098:
0099: /** <code>true</code> if this has been killed. */
0100: private boolean isKilled = false;
0101:
0102: /** Virtual database instantiating this thread. */
0103: private VirtualDatabase vdb;
0104:
0105: /** Logger instance. */
0106: private Trace logger = null;
0107:
0108: private DriverBufferedInputStream in = null;
0109: private DriverBufferedOutputStream out = null;
0110:
0111: private VirtualDatabaseUser user;
0112:
0113: private Controller controller;
0114:
0115: private boolean waitForCommand;
0116:
0117: private HashMap streamedResultSets;
0118:
0119: private RequestFactory requestFactory = ControllerConstants.CONTROLLER_FACTORY
0120: .getRequestFactory();
0121: /**
0122: * The following variables represent the state of the connection with the
0123: * client
0124: */
0125: private boolean persistentConnection;
0126: private long persistentConnectionId;
0127: private boolean connectionHasClosed;
0128: private boolean retrieveSQLWarnings;
0129: private long currentTid;
0130: private boolean transactionStarted;
0131: private boolean transactionHasAborted;
0132: private boolean queryExecutedInThisTransaction;
0133: private boolean writeQueryExecutedInThisTransaction;
0134: // Number of savepoints in the current transaction
0135: private int hasSavepoint;
0136: private String clientIpAddress;
0137: private String login;
0138: private boolean closed;
0139: private int transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
0140: private boolean isReadOnly = false;
0141: private String connectionLineSeparator = null;
0142:
0143: /* end user logger */
0144: static Trace endUserLogger = Trace
0145: .getLogger("org.continuent.sequoia.enduser");
0146:
0147: /*
0148: * Constructor
0149: */
0150:
0151: /**
0152: * Creates a new <code>VirtualDatabaseWorkerThread</code> instance.
0153: *
0154: * @param controller the thread was originated from
0155: * @param vdb the virtual database instantiating this thread.
0156: */
0157: public VirtualDatabaseWorkerThread(Controller controller,
0158: VirtualDatabase vdb) {
0159: super ("VirtualDatabaseWorkerThread-"
0160: + vdb.getVirtualDatabaseName());
0161: this .vdb = vdb;
0162: this .controller = controller;
0163: try {
0164: this .logger = Trace
0165: .getLogger("org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread."
0166: + vdb.getVirtualDatabaseName());
0167: } catch (Exception e) {
0168: this .logger = vdb.logger;
0169: }
0170: }
0171:
0172: //
0173: // Decoding commands from the stream
0174: //
0175:
0176: /**
0177: * Gets a connection from the connection queue and process it.
0178: */
0179: public void run() {
0180: ArrayList vdbActiveThreads = vdb.getActiveThreads();
0181: ArrayList vdbPendingQueue = vdb.getPendingConnections();
0182: // List of open ResultSets for streaming. This is not synchronized since the
0183: // connection does only handle one request at a time
0184: streamedResultSets = new HashMap();
0185: boolean isActive = true;
0186:
0187: if (vdbActiveThreads == null) {
0188: logger
0189: .error("Got null active threads queue in VirtualDatabaseWorkerThread");
0190: isKilled = true;
0191: }
0192: if (vdbPendingQueue == null) {
0193: logger
0194: .error("Got null connection queue in VirtualDatabaseWorkerThread");
0195: isKilled = true;
0196: }
0197:
0198: // Main loop
0199: while (!isKilled) {
0200: // Get a connection from the pending queue
0201: synchronized (vdbPendingQueue) {
0202: while (vdbPendingQueue.isEmpty()) {
0203: if (!vdb.poolConnectionThreads) { // User does not want thread pooling, kill this thread!
0204: isKilled = true;
0205: break;
0206: }
0207: boolean timeout = false;
0208: try {
0209: if (isActive) {
0210: isActive = false;
0211: // Remove ourselves from the active thread list
0212: synchronized (vdbActiveThreads) {
0213: vdbActiveThreads.remove(this );
0214: vdb.incrementIdleThreadCount();
0215: }
0216: }
0217: long before = System.currentTimeMillis();
0218: vdbPendingQueue
0219: .wait(vdb.getMaxThreadIdleTime());
0220: long now = System.currentTimeMillis();
0221: // Check if timeout has expired
0222: timeout = now - before >= vdb
0223: .getMaxThreadIdleTime();
0224: } catch (InterruptedException e) {
0225: logger
0226: .warn("VirtualDatabaseWorkerThread wait() interrupted");
0227: }
0228: if (timeout && vdbPendingQueue.isEmpty()) {
0229: if (vdb.currentNbOfThreads > vdb.minNbOfThreads) { // We have enough threads, kill this one
0230: isKilled = true;
0231: break;
0232: }
0233: }
0234: }
0235:
0236: if (isKilled) { // Cleaning up
0237: synchronized (vdbActiveThreads) { // Remove ourselves from the appropriate thread list
0238: if (isActive) {
0239: vdbActiveThreads.remove(this );
0240: vdb.decreaseCurrentNbOfThread();
0241: } else
0242: vdb.decreaseIdleThread();
0243: }
0244: // Get out of the while loop
0245: continue;
0246: }
0247:
0248: // Get a connection
0249: try {
0250: in = (DriverBufferedInputStream) vdbPendingQueue
0251: .remove(0);
0252: out = (DriverBufferedOutputStream) vdbPendingQueue
0253: .remove(0);
0254: } catch (Exception e) {
0255: logger
0256: .error("Error while getting streams from connection");
0257: continue;
0258: }
0259:
0260: synchronized (vdbActiveThreads) {
0261: if (!isActive) {
0262: vdb.decreaseIdleThread();
0263: isActive = true;
0264: // Add this thread to the active thread list
0265: vdbActiveThreads.add(this );
0266: }
0267: }
0268: }
0269:
0270: closed = false;
0271:
0272: // Handle connection
0273: // Read the user information and check authentication
0274: /**
0275: * @see org.continuent.sequoia.driver.Driver#connectToController(Properties,
0276: * SequoiaUrl, ControllerInfo)
0277: */
0278: boolean success = false;
0279: try {
0280: login = in.readLongUTF();
0281: String password = in.readLongUTF();
0282: user = new VirtualDatabaseUser(login, password);
0283:
0284: // Pre-check for transparent login
0285: if (vdb.getAuthenticationManager()
0286: .isTransparentLoginEnabled()) {
0287: if (!vdb.getAuthenticationManager()
0288: .isValidVirtualUser(user)) {
0289: vdb.checkAndAddVirtualDatabaseUser(user);
0290: }
0291: }
0292:
0293: if (vdb.getAuthenticationManager().isValidVirtualUser(
0294: user)) { // Authentication ok
0295: out.writeBoolean(true); // success code
0296: out.flush();
0297: success = true;
0298: try {
0299: clientIpAddress = in.getSocket()
0300: .getInetAddress().toString();
0301: } catch (NullPointerException e) // no method above throws anything
0302: {
0303: clientIpAddress = "Unable to fetch client address";
0304: }
0305:
0306: if (logger.isDebugEnabled())
0307: logger.debug("Login accepted for " + login
0308: + " from " + clientIpAddress);
0309:
0310: connectionLineSeparator = in.readLongUTF();
0311: persistentConnection = in.readBoolean();
0312: if (persistentConnection) {
0313: persistentConnectionId = vdb
0314: .getNextConnectionId();
0315: try {
0316: vdb.openPersistentConnection(login,
0317: persistentConnectionId);
0318: out.writeBoolean(true);
0319: out.writeLong(persistentConnectionId);
0320: out.flush();
0321: } catch (SQLException e) {
0322: success = false;
0323: out.writeBoolean(false);
0324: out.flush();
0325: continue;
0326: }
0327: }
0328: retrieveSQLWarnings = in.readBoolean();
0329: } else { // Authentication failed, close the connection
0330: String msg = "Authentication failed for user '"
0331: + login + "'";
0332: out.writeBoolean(false); // authentication failed
0333: out.writeLongUTF(msg); // error message
0334: if (logger.isDebugEnabled())
0335: logger.debug(msg);
0336: endUserLogger.error(Translate.get(
0337: "virtualdatabase.authentication.failed",
0338: login));
0339: continue;
0340: }
0341: } catch (IOException e) {
0342: logger.error("I/O error during user authentication ("
0343: + e + ")");
0344: closed = true;
0345: } finally {
0346: if (!success) {
0347: try {
0348: out.close();
0349: in.close();
0350: } catch (IOException ignore) {
0351: }
0352: }
0353: }
0354:
0355: currentTid = 0;
0356: connectionHasClosed = false;
0357: transactionStarted = false;
0358: transactionHasAborted = false;
0359: transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
0360: queryExecutedInThisTransaction = false;
0361: writeQueryExecutedInThisTransaction = false;
0362: hasSavepoint = 0;
0363:
0364: int command = -1;
0365: while (!closed && !isKilled) {
0366: try {
0367: // Get the query
0368: waitForCommand = true;
0369: out.writeInt(Commands.ControllerPrompt);
0370: out.flush();
0371: if (Commands.CommandPrefix != in.readInt()) {
0372: logger.error("Protocol corruption with client "
0373: + login + ", last command was:"
0374: + command + ". Closing.");
0375: // FIXME: because of the protocol corruption, this has very little
0376: // chance to actually close the connection. We need something more
0377: // rude here, like shutting down ourselves.
0378: command = Commands.Close;
0379: } else {
0380: try {
0381: command = in.readInt();
0382: } catch (NullPointerException e) {
0383: // SEQUOIA-777: this NPE happens when this thread is blocked
0384: // on in.read() and 'in' gets closed either explicitly
0385: // by the shutdown thread or by an unexpected client socket death.
0386: // The NPE is a known issue with jdk < 1.5, see SEQUOIA-777.
0387: // In the shutdown case, the flag isKilled is set
0388: // and we can exit the while loop. Otherwize, the client socket
0389: // died unexpectedly and we consider this is a CLOSE.
0390: if (isKilled)
0391: continue;
0392: logger
0393: .warn("Client unexpectedly dropped connection. Closing.");
0394: command = Commands.Close;
0395: }
0396: }
0397:
0398: waitForCommand = false;
0399:
0400: // Process it
0401: switch (command) {
0402: case Commands.StatementExecuteQuery:
0403: statementExecuteQuery(null);
0404: break;
0405: case Commands.StatementExecuteUpdate:
0406: statementExecuteUpdate(null);
0407: break;
0408: case Commands.StatementExecuteUpdateWithKeys:
0409: statementExecuteUpdateWithKeys();
0410: break;
0411: case Commands.CallableStatementExecuteQuery:
0412: callableStatementExecuteQuery(null, false);
0413: break;
0414: case Commands.CallableStatementExecuteUpdate:
0415: callableStatementExecuteUpdate(null, false);
0416: break;
0417: case Commands.CallableStatementExecute:
0418: callableStatementExecute(null, false);
0419: break;
0420: case Commands.CallableStatementExecuteQueryWithParameters:
0421: callableStatementExecuteQuery(null, true);
0422: break;
0423: case Commands.CallableStatementExecuteUpdateWithParameters:
0424: callableStatementExecuteUpdate(null, true);
0425: break;
0426: case Commands.CallableStatementExecuteWithParameters:
0427: callableStatementExecute(null, true);
0428: break;
0429: case Commands.StatementExecute:
0430: statementExecute(null);
0431: break;
0432: case Commands.Begin:
0433: begin();
0434: break;
0435: case Commands.Commit:
0436: commit();
0437: break;
0438: case Commands.Rollback:
0439: rollback();
0440: break;
0441: case Commands.SetNamedSavepoint:
0442: setNamedSavepoint();
0443: break;
0444: case Commands.SetUnnamedSavepoint:
0445: setUnnamedSavepoint();
0446: break;
0447: case Commands.ReleaseSavepoint:
0448: releaseSavepoint();
0449: break;
0450: case Commands.RollbackToSavepoint:
0451: rollbackToSavepoint();
0452: break;
0453: case Commands.SetTransactionIsolation:
0454: connectionSetTransactionIsolation();
0455: break;
0456: case Commands.SetReadOnly:
0457: connectionSetReadOnly();
0458: break;
0459: case Commands.ConnectionGetWarnings:
0460: connectionGetWarnings();
0461: break;
0462: case Commands.ConnectionClearWarnings:
0463: connectionClearWarnings();
0464: break;
0465: case Commands.GetVirtualDatabaseName:
0466: getVirtualDatabaseName();
0467: break;
0468: case Commands.DatabaseMetaDataGetDatabaseProductName:
0469: databaseMetaDataGetDatabaseProductName();
0470: break;
0471: case Commands.GetControllerVersionNumber:
0472: getControllerVersionNumber();
0473: break;
0474: case Commands.DatabaseMetaDataGetTables:
0475: databaseMetaDataGetTables();
0476: break;
0477: case Commands.DatabaseMetaDataGetColumns:
0478: databaseMetaDataGetColumns();
0479: break;
0480: case Commands.DatabaseMetaDataGetPrimaryKeys:
0481: databaseMetaDataGetPrimaryKeys();
0482: break;
0483: case Commands.DatabaseMetaDataGetProcedures:
0484: databaseMetaDataGetProcedures();
0485: break;
0486: case Commands.DatabaseMetaDataGetProcedureColumns:
0487: databaseMetaDataGetProcedureColumns();
0488: break;
0489: case Commands.ConnectionGetCatalogs:
0490: connectionGetCatalogs();
0491: break;
0492: case Commands.ConnectionGetCatalog:
0493: connectionGetCatalog();
0494: break;
0495: case Commands.DatabaseMetaDataGetTableTypes:
0496: databaseMetaDataGetTableTypes();
0497: break;
0498: case Commands.DatabaseMetaDataGetSchemas:
0499: databaseMetaDataGetSchemas();
0500: break;
0501: case Commands.DatabaseMetaDataGetTablePrivileges:
0502: databaseMetaDataGetTablePrivileges();
0503: break;
0504: case Commands.DatabaseMetaDataGetAttributes:
0505: databaseMetaDataGetAttributes();
0506: break;
0507: case Commands.DatabaseMetaDataGetBestRowIdentifier:
0508: databaseMetaDataGetBestRowIdentifier();
0509: break;
0510: case Commands.DatabaseMetaDataGetColumnPrivileges:
0511: databaseMetaDataGetColumnPrivileges();
0512: break;
0513: case Commands.DatabaseMetaDataGetCrossReference:
0514: databaseMetaDataGetCrossReference();
0515: break;
0516: case Commands.DatabaseMetaDataGetExportedKeys:
0517: databaseMetaDataGetExportedKeys();
0518: break;
0519: case Commands.DatabaseMetaDataGetImportedKeys:
0520: databaseMetaDataGetImportedKeys();
0521: break;
0522: case Commands.DatabaseMetaDataGetIndexInfo:
0523: databaseMetaDataGetIndexInfo();
0524: break;
0525: case Commands.DatabaseMetaDataGetSuperTables:
0526: databaseMetaDataGetSuperTables();
0527: break;
0528: case Commands.DatabaseMetaDataGetSuperTypes:
0529: databaseMetaDataGetSuperTypes();
0530: break;
0531: case Commands.DatabaseMetaDataGetTypeInfo:
0532: databaseMetaDataGetTypeInfo();
0533: break;
0534: case Commands.DatabaseMetaDataGetUDTs:
0535: databaseMetaDataGetUDTs();
0536: break;
0537: case Commands.DatabaseMetaDataGetVersionColumns:
0538: databaseMetaDataGetVersionColumns();
0539: break;
0540: case Commands.PreparedStatementGetMetaData:
0541: preparedStatementGetMetaData();
0542: break;
0543: case Commands.ConnectionSetCatalog:
0544: connectionSetCatalog();
0545: break;
0546: case Commands.Close:
0547: close();
0548: break;
0549: case Commands.Reset:
0550: reset();
0551: break;
0552: case Commands.FetchNextResultSetRows:
0553: fetchNextResultSetRows();
0554: break;
0555: case Commands.CloseRemoteResultSet:
0556: closeRemoteResultSet();
0557: break;
0558: case Commands.DatabaseStaticMetadata:
0559: databaseStaticMetadata();
0560: break;
0561: case Commands.RestoreConnectionState:
0562: restoreConnectionState();
0563: break;
0564: case Commands.RetrieveExecuteQueryResult:
0565: retrieveExecuteQueryResult();
0566: break;
0567: case Commands.RetrieveExecuteResult:
0568: retrieveExecuteResult();
0569: break;
0570: case Commands.RetrieveExecuteUpdateResult:
0571: retrieveExecuteUpdateResult();
0572: break;
0573: case Commands.RetrieveExecuteUpdateWithKeysResult:
0574: retrieveExecuteUpdateWithKeysResult();
0575: break;
0576: case Commands.RetrieveExecuteQueryResultWithParameters:
0577: retrieveExecuteQueryResultWithParameters();
0578: break;
0579: case Commands.RetrieveExecuteUpdateResultWithParameters:
0580: retrieveExecuteUpdateResultWithParameters();
0581: break;
0582: case Commands.RetrieveExecuteResultWithParameters:
0583: retrieveExecuteResultWithParameters();
0584: break;
0585: case Commands.RetrieveCommitResult:
0586: retrieveCommitResult();
0587: break;
0588: case Commands.RetrieveRollbackResult:
0589: retrieveRollbackResult();
0590: break;
0591: case Commands.RetrieveReleaseSavepoint:
0592: retrieveReleaseSavepoint();
0593: break;
0594: default:
0595: String errorMsg = "Unsupported protocol command: "
0596: + command;
0597: logger.error(errorMsg);
0598: sendToDriver(new RuntimeException(errorMsg));
0599: break;
0600: }
0601: } catch (EOFException e) {
0602: logger.warn("Client (login:"
0603: + login
0604: + ",host:"
0605: + in.getSocket().getInetAddress()
0606: .getHostName()
0607: + " closed connection with server)");
0608: closed = true;
0609: } catch (SocketException e) {
0610: // shutting down
0611: closed = true;
0612: } catch (IOException e) {
0613: closed = true;
0614: logger.warn("Closing connection with client "
0615: + login + " because of IOException.(" + e
0616: + ")");
0617: } catch (VDBisShuttingDownException e) {
0618: isKilled = true;
0619: } catch (SQLException e) {
0620: logger.warn("Error during command execution ("
0621: + e.getMessage() + ")");
0622: if (transactionStarted && !transactionHasAborted) { // Failure of a query within a transaction automatically aborts the
0623: // transaction
0624: transactionHasAborted = (hasSavepoint == 0)
0625: && ((command == Commands.StatementExecuteUpdate)
0626: || (command == Commands.StatementExecuteUpdateWithKeys)
0627: || (command == Commands.StatementExecute)
0628: || (command == Commands.CallableStatementExecuteWithParameters)
0629: || (command == Commands.CallableStatementExecuteQueryWithParameters)
0630: || (command == Commands.CallableStatementExecuteUpdateWithParameters)
0631: || (command == Commands.CallableStatementExecuteQuery) || (command == Commands.CallableStatementExecuteUpdate));
0632: }
0633: try {
0634: sendToDriver(e);
0635: } catch (IOException ignore) {
0636: }
0637: } catch (BadJDBCApiUsageException e) {
0638: logger.warn("Error during command execution ("
0639: + e.getMessage() + ")");
0640: try {
0641: sendToDriver(e);
0642: } catch (IOException ignore) {
0643: }
0644: } catch (Throwable e) {
0645: logger.warn(
0646: "Runtime error during command execution ("
0647: + e.getMessage() + ")", e);
0648: if (transactionStarted) { // Failure of a query within a transaction automatically aborts the
0649: // transaction
0650: transactionHasAborted = (hasSavepoint == 0)
0651: && ((command == Commands.StatementExecuteQuery)
0652: || (command == Commands.StatementExecuteUpdate)
0653: || (command == Commands.StatementExecuteUpdateWithKeys)
0654: || (command == Commands.StatementExecute)
0655: || (command == Commands.CallableStatementExecute)
0656: || (command == Commands.CallableStatementExecuteQuery) || (command == Commands.CallableStatementExecuteUpdate));
0657: }
0658: try {
0659: sendToDriver((SQLException) new SQLException(e
0660: .getLocalizedMessage()).initCause(e));
0661: } catch (IOException ignore) {
0662: }
0663: }
0664: } // while (!closed && !isKilled) get and process command from driver
0665:
0666: // Do the cleanup
0667: if (!streamedResultSets.isEmpty()) {
0668: for (Iterator iter = streamedResultSets.values()
0669: .iterator(); iter.hasNext();) {
0670: ControllerResultSet crs = (ControllerResultSet) iter
0671: .next();
0672: crs.closeResultSet();
0673: }
0674: streamedResultSets.clear();
0675: }
0676:
0677: if (!isKilled) {
0678: // Abort in-flight transaction
0679: if (transactionStarted && !transactionHasAborted) {
0680: if (logger.isDebugEnabled())
0681: logger.debug("Aborting transaction "
0682: + currentTid);
0683: try {
0684: vdb.abort(currentTid,
0685: writeQueryExecutedInThisTransaction,
0686: true);
0687: } catch (Throwable e) {
0688: if (logger.isWarnEnabled())
0689: logger
0690: .warn("Error during abort of transaction "
0691: + currentTid
0692: + "("
0693: + e
0694: + ")");
0695: }
0696: }
0697:
0698: // Close persistent connections
0699: if (persistentConnection) {
0700: vdb.closePersistentConnection(login,
0701: persistentConnectionId);
0702: }
0703: } else {
0704: // FIXME: debug message for safe mode parallel shutdown of controllers
0705: // (note that parallel shutdown should be avoided)
0706: if (logger.isInfoEnabled()) {
0707: logger
0708: .info("VirtualDatabaseWorkerThread killed by shutdown, no clean-up"
0709: + " done. Number of pending transaction in scheduler: "
0710: + vdb.getRequestManager()
0711: .getScheduler()
0712: .getPendingTransactions());
0713: }
0714: }
0715:
0716: // Close streams and underlying socket
0717: try {
0718: in.close();
0719: } catch (IOException ignore) {
0720: }
0721: try {
0722: out.close();
0723: } catch (IOException ignore) {
0724: }
0725: }
0726:
0727: synchronized (vdbActiveThreads) { // Remove ourselves from the appropriate thread list
0728: if (vdbActiveThreads.remove(this ))
0729: vdb.decreaseCurrentNbOfThread();
0730: }
0731:
0732: if (logger.isDebugEnabled())
0733: logger
0734: .debug("VirtualDatabaseWorkerThread associated to login: "
0735: + this .getUser() + " terminating.");
0736: }
0737:
0738: private void close() throws IOException {
0739: if (logger.isDebugEnabled())
0740: logger.debug("Close command");
0741:
0742: cleanup();
0743:
0744: sendToDriver(true);
0745:
0746: closed = true;
0747: }
0748:
0749: private void closeRemoteResultSet() throws IOException {
0750: if (logger.isDebugEnabled())
0751: logger.debug("CloseRemoteResultSet command");
0752:
0753: String cursor = in.readLongUTF();
0754: ControllerResultSet crsToClose = (ControllerResultSet) streamedResultSets
0755: .remove(cursor);
0756: if (crsToClose == null) {
0757: sendToDriver(new SQLException(
0758: "No valid RemoteResultSet to close."));
0759: } else {
0760: crsToClose.closeResultSet();
0761: sendToDriver(true);
0762: }
0763: }
0764:
0765: private void reset() throws IOException {
0766: // The client application has closed the connection but it is kept
0767: // open in case the transparent connection pooling reuses it.
0768: if (logger.isDebugEnabled())
0769: logger.debug("Reset command");
0770:
0771: cleanup();
0772:
0773: connectionHasClosed = false;
0774: currentTid = 0;
0775: transactionStarted = false;
0776: transactionHasAborted = false;
0777: transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
0778: queryExecutedInThisTransaction = false;
0779: hasSavepoint = 0;
0780: sendToDriver(true);
0781: }
0782:
0783: private void cleanup() {
0784: // Do the cleanup
0785: if (transactionStarted && !transactionHasAborted) {
0786: /*
0787: * We need to abort the begin to cleanup the metadata associated with the
0788: * started transaction.
0789: */
0790: if (logger.isDebugEnabled())
0791: logger.debug("Aborting transaction " + currentTid);
0792: try {
0793: vdb.abort(currentTid,
0794: writeQueryExecutedInThisTransaction, true);
0795: } catch (Exception e) {
0796: if (logger.isDebugEnabled())
0797: logger.debug("Error while aborting transaction "
0798: + currentTid + "(" + e + ")", e);
0799: }
0800: }
0801: }
0802:
0803: private void restoreConnectionState() throws IOException,
0804: SQLException {
0805: if (logger.isDebugEnabled())
0806: logger.debug("RestoreConnectionState command");
0807:
0808: // Re-connect has opened a new persistent connection that will not be used
0809: if (persistentConnection) {
0810: vdb
0811: .closePersistentConnection(login,
0812: persistentConnectionId);
0813: }
0814:
0815: writeQueryExecutedInThisTransaction = in.readBoolean();
0816: // We receive autocommit from driver
0817: transactionStarted = !in.readBoolean();
0818: if (transactionStarted)
0819: currentTid = in.readLong();
0820: persistentConnection = in.readBoolean();
0821: if (persistentConnection)
0822: persistentConnectionId = in.readLong();
0823:
0824: // Restore the persistent connection first (if any) before trying to perform
0825: // any operation on the transaction
0826: if (persistentConnection) {
0827: if (!vdb.hasPersistentConnection(persistentConnectionId)) {
0828: RecoveryLog recoveryLog = vdb.getRequestManager()
0829: .getRecoveryLog();
0830: if (!recoveryLog
0831: .findCloseForPersistentConnection(persistentConnectionId)) {
0832: vdb
0833: .failoverForPersistentConnection(persistentConnectionId);
0834: } else {
0835: connectionHasClosed = true;
0836: }
0837: }
0838: }
0839:
0840: retrieveSQLWarnings = in.readBoolean();
0841:
0842: // Acknowledge driver
0843: out.writeBoolean(true);
0844: out.flush();
0845:
0846: if (transactionStarted) {
0847: try {
0848: // Check if the transaction exists here
0849: vdb.requestManager.getTransactionMetaData(new Long(
0850: currentTid));
0851: // Only notify failover if we have the transaction in our context
0852: vdb.failoverForTransaction(currentTid);
0853: /*
0854: * Transaction is started on this controller... it was either a
0855: * transaction that contained write statements, either a read-only
0856: * transaction with broadcasted statements, so we force
0857: * writeQueryExecutedInThisTransaction to true.
0858: */
0859: writeQueryExecutedInThisTransaction = true;
0860: } catch (SQLException e) {
0861: /*
0862: * Transaction has not been found because it either already
0863: * committed/rollbacked or it was not started (no request played so far
0864: * in the transaction or just read queries on the controller that has
0865: * failed). Check first if we can find a trace of commit/rollback in the
0866: * recovery log and if not start the transaction now. This is needed
0867: * only if it was a write transaction or a transaction with broadcasted
0868: * read requests.
0869: */
0870: RecoveryLog recoveryLog = vdb.getRequestManager()
0871: .getRecoveryLog();
0872: if (writeQueryExecutedInThisTransaction) {
0873: if (!recoveryLog
0874: .findCommitForTransaction(currentTid)
0875: && !recoveryLog
0876: .findRollbackForTransaction(currentTid)) {
0877: vdb.requestManager.doBegin(login, currentTid,
0878: persistentConnection,
0879: persistentConnectionId);
0880: } else {
0881: // FIXME dirty overload of this flag semantics
0882: transactionHasAborted = true;
0883: }
0884: } else {
0885: vdb.requestManager.doBegin(login, currentTid,
0886: persistentConnection,
0887: persistentConnectionId);
0888: writeQueryExecutedInThisTransaction = true;
0889: }
0890: }
0891: }
0892: }
0893:
0894: //
0895: // Catalog
0896: //
0897:
0898: private void connectionSetCatalog() throws IOException {
0899: // Warning! This could bypass the security checkings based on client IP
0900: // address. If a user has access to a virtual database, through setCatalog()
0901: // is will be able to access all other virtual databases where his
0902: // login/password is valid regardless of the IP filtering settings.
0903: if (logger.isDebugEnabled())
0904: logger.debug("ConnectionSetCatalog command");
0905: String catalog = in.readLongUTF();
0906: boolean change = controller.hasVirtualDatabase(catalog);
0907: if (change) {
0908: VirtualDatabase tempvdb = controller
0909: .getVirtualDatabase(catalog);
0910: if (!tempvdb.getAuthenticationManager().isValidVirtualUser(
0911: user))
0912: sendToDriver(new SQLException(
0913: "User authentication has failed for asked catalog. No change"));
0914: else {
0915: this .vdb = tempvdb;
0916: sendToDriver(true);
0917: }
0918: } else
0919: sendToDriver(false);
0920:
0921: }
0922:
0923: private void connectionGetCatalog() throws IOException {
0924: if (logger.isDebugEnabled())
0925: logger.debug("ConnectionGetCatalog command");
0926:
0927: sendToDriver(vdb.getVirtualDatabaseName());
0928: }
0929:
0930: private void connectionGetCatalogs() throws IOException {
0931: if (logger.isDebugEnabled())
0932: logger.debug("ConnectionGetCatalogs command");
0933: ArrayList list = controller.getVirtualDatabaseNames();
0934: sendToDriver(vdb.getDynamicMetaData().getCatalogs(list));
0935: }
0936:
0937: private void connectionSetTransactionIsolation() throws IOException {
0938: int level = in.readInt();
0939: if (logger.isDebugEnabled())
0940: logger.debug("SetTransactionIsolation command (level="
0941: + level + ")");
0942:
0943: // Check that we are not in a running transaction
0944: if (transactionStarted && queryExecutedInThisTransaction) {
0945: sendToDriver(new SQLException(
0946: "Cannot change the transaction isolation in a running transaction"));
0947: return;
0948: }
0949:
0950: MetadataContainer metadataContainer = vdb.getStaticMetaData()
0951: .getMetadataContainer();
0952: if (metadataContainer != null) {
0953: Object value = metadataContainer
0954: .get(MetadataContainer
0955: .getContainerKey(
0956: MetadataDescription.SUPPORTS_TRANSACTION_ISOLATION_LEVEL,
0957: new Class[] { Integer.TYPE },
0958: new Object[] { new Integer(level) }));
0959:
0960: if (value != null) {
0961: if (!((Boolean) value).booleanValue()) {
0962: sendToDriver(new SQLException(
0963: "Transaction isolation level "
0964: + level
0965: + " is not supported by the database"));
0966: return;
0967: }
0968: } else
0969: logger
0970: .warn("Unable to check validity of transaction isolation level "
0971: + level);
0972: } else
0973: logger
0974: .warn("Unable to check validity of transaction isolation level "
0975: + level);
0976: transactionIsolation = level;
0977: sendToDriver(true);
0978: }
0979:
0980: private void connectionSetReadOnly() throws IOException {
0981: isReadOnly = in.readBoolean();
0982: if (logger.isDebugEnabled())
0983: logger.debug("SetReadOnly command (value=" + true + ")");
0984:
0985: sendToDriver(true);
0986: }
0987:
0988: private void connectionGetWarnings() throws IOException {
0989: long persistentConnId = in.readLong();
0990: try {
0991: sendToDriver(vdb.getConnectionWarnings(persistentConnId));
0992: } catch (SQLException e) {
0993: sendToDriver(e);
0994: }
0995: }
0996:
0997: private void connectionClearWarnings() throws IOException {
0998: long persistentConnId = in.readLong();
0999: try {
1000: vdb.clearConnectionWarnings(persistentConnId);
1001: sendToDriver(true);
1002: } catch (SQLException e) {
1003: sendToDriver(e);
1004: }
1005: }
1006:
1007: //
1008: // Database MetaData
1009: //
1010:
1011: /**
1012: * @see java.sql.DatabaseMetaData#getAttributes(java.lang.String,
1013: * java.lang.String, java.lang.String, java.lang.String)
1014: */
1015: private void databaseMetaDataGetAttributes() throws IOException {
1016: if (logger.isDebugEnabled())
1017: logger.debug("DatabaseMetaDataGetAttributes command");
1018: String catalog = in.readLongUTF();
1019: String schemaPattern = in.readLongUTF();
1020: String typeNamePattern = in.readLongUTF();
1021: String attributeNamePattern = in.readLongUTF();
1022:
1023: try {
1024: sendToDriver(vdb.getDynamicMetaData().getAttributes(
1025: new ConnectionContext(login, transactionStarted,
1026: currentTid, persistentConnection,
1027: persistentConnectionId), catalog,
1028: schemaPattern, typeNamePattern,
1029: attributeNamePattern));
1030: } catch (SQLException e) {
1031: if (logger.isWarnEnabled())
1032: logger
1033: .warn(
1034: "Error while calling databaseMetaDataGetAttributes",
1035: e);
1036: sendToDriver(e);
1037: }
1038: }
1039:
1040: /**
1041: * @see java.sql.DatabaseMetaData#getBestRowIdentifier(java.lang.String,
1042: * java.lang.String, java.lang.String, int, boolean)
1043: */
1044: private void databaseMetaDataGetBestRowIdentifier()
1045: throws IOException {
1046: if (logger.isDebugEnabled())
1047: logger
1048: .debug("DatabaseMetaDataGetBestRowIdentifier command");
1049:
1050: String catalog = in.readLongUTF();
1051: String schema = in.readLongUTF();
1052: String table = in.readLongUTF();
1053: int scope = in.readInt();
1054: boolean nullable = in.readBoolean();
1055:
1056: try {
1057: sendToDriver(vdb.getDynamicMetaData().getBestRowIdentifier(
1058: new ConnectionContext(login, transactionStarted,
1059: currentTid, persistentConnection,
1060: persistentConnectionId), catalog, schema,
1061: table, scope, nullable));
1062: } catch (SQLException e) {
1063: if (logger.isWarnEnabled())
1064: logger
1065: .warn(
1066: "Error while calling databaseMetaDataGetBestRowIdentifier",
1067: e);
1068: sendToDriver(e);
1069: }
1070: }
1071:
1072: /**
1073: * @see java.sql.DatabaseMetaData#getColumnPrivileges(java.lang.String,
1074: * java.lang.String, java.lang.String, java.lang.String)
1075: */
1076: private void databaseMetaDataGetColumnPrivileges()
1077: throws IOException {
1078: if (logger.isDebugEnabled())
1079: logger.debug("DatabaseMetaDataGetColumnPrivileges command");
1080:
1081: String catalog = in.readLongUTF();
1082: String schema = in.readLongUTF();
1083: String table = in.readLongUTF();
1084: String columnNamePattern = in.readLongUTF();
1085:
1086: try {
1087: sendToDriver(vdb.getDynamicMetaData().getColumnPrivileges(
1088: new ConnectionContext(login, transactionStarted,
1089: currentTid, persistentConnection,
1090: persistentConnectionId), catalog, schema,
1091: table, columnNamePattern));
1092: } catch (SQLException e) {
1093: if (logger.isWarnEnabled())
1094: logger
1095: .warn(
1096: "Error while calling databaseMetaDataGetColumnPrivileges",
1097: e);
1098: sendToDriver(e);
1099: }
1100: }
1101:
1102: /**
1103: * @see java.sql.DatabaseMetaData#getColumns(java.lang.String,
1104: * java.lang.String, java.lang.String, java.lang.String)
1105: */
1106: private void databaseMetaDataGetColumns() throws IOException {
1107: if (logger.isDebugEnabled())
1108: logger.debug("DatabaseMetaDataGetColumns command");
1109: String ccatalog = in.readLongUTF();
1110: String cschemaPattern = in.readLongUTF();
1111: String ctableNamePattern = in.readLongUTF();
1112: String ccolumnNamePattern = in.readLongUTF();
1113:
1114: try {
1115: sendToDriver(vdb.getDynamicMetaData().getColumns(
1116: new ConnectionContext(login, transactionStarted,
1117: currentTid, persistentConnection,
1118: persistentConnectionId), ccatalog,
1119: cschemaPattern, ctableNamePattern,
1120: ccolumnNamePattern));
1121: } catch (SQLException e) {
1122: if (logger.isWarnEnabled())
1123: logger
1124: .warn(
1125: "Error while calling databaseMetaDataGetColumns",
1126: e);
1127: sendToDriver(e);
1128: }
1129: }
1130:
1131: /**
1132: * @see java.sql.DatabaseMetaData#getCrossReference(java.lang.String,
1133: * java.lang.String, java.lang.String, java.lang.String,
1134: * java.lang.String, java.lang.String)
1135: */
1136: private void databaseMetaDataGetCrossReference() throws IOException {
1137: if (logger.isDebugEnabled())
1138: logger.debug("DatabaseMetaDataGetCrossReference command");
1139:
1140: String primaryCatalog = in.readLongUTF();
1141: String primarySchema = in.readLongUTF();
1142: String primaryTable = in.readLongUTF();
1143: String foreignCatalog = in.readLongUTF();
1144: String foreignSchema = in.readLongUTF();
1145: String foreignTable = in.readLongUTF();
1146:
1147: try {
1148: sendToDriver(vdb.getDynamicMetaData().getCrossReference(
1149: new ConnectionContext(login, transactionStarted,
1150: currentTid, persistentConnection,
1151: persistentConnectionId), primaryCatalog,
1152: primarySchema, primaryTable, foreignCatalog,
1153: foreignSchema, foreignTable));
1154: } catch (SQLException e) {
1155: if (logger.isWarnEnabled())
1156: logger
1157: .warn(
1158: "Error while calling databaseMetaDataGetCrossReference",
1159: e);
1160: sendToDriver(e);
1161: }
1162: }
1163:
1164: /**
1165: * @see java.sql.DatabaseMetaData#getDatabaseProductName()
1166: */
1167: private void databaseMetaDataGetDatabaseProductName()
1168: throws IOException {
1169: if (logger.isDebugEnabled())
1170: logger.debug("GetDatabaseProductName command");
1171:
1172: sendToDriver(vdb.getDatabaseProductName());
1173: }
1174:
1175: /**
1176: * @see java.sql.DatabaseMetaData#getExportedKeys(java.lang.String,
1177: * java.lang.String, java.lang.String)
1178: */
1179: private void databaseMetaDataGetExportedKeys() throws IOException {
1180: if (logger.isDebugEnabled())
1181: logger.debug("DatabaseMetaDataGetExportedKeys command");
1182:
1183: String catalog = in.readLongUTF();
1184: String schema = in.readLongUTF();
1185: String table = in.readLongUTF();
1186:
1187: try {
1188: sendToDriver(vdb.getDynamicMetaData().getExportedKeys(
1189: new ConnectionContext(login, transactionStarted,
1190: currentTid, persistentConnection,
1191: persistentConnectionId), catalog, schema,
1192: table));
1193: } catch (SQLException e) {
1194: if (logger.isWarnEnabled())
1195: logger
1196: .warn(
1197: "Error while calling databaseMetaDataGetExportedKeys",
1198: e);
1199: sendToDriver(e);
1200: }
1201: }
1202:
1203: /**
1204: * @see java.sql.DatabaseMetaData#getImportedKeys(java.lang.String,
1205: * java.lang.String, java.lang.String)
1206: */
1207: private void databaseMetaDataGetImportedKeys() throws IOException {
1208: if (logger.isDebugEnabled())
1209: logger.debug("DatabaseMetaDataGetImportedKeys command");
1210:
1211: String catalog = in.readLongUTF();
1212: String schema = in.readLongUTF();
1213: String table = in.readLongUTF();
1214:
1215: try {
1216: sendToDriver(vdb.getDynamicMetaData().getImportedKeys(
1217: new ConnectionContext(login, transactionStarted,
1218: currentTid, persistentConnection,
1219: persistentConnectionId), catalog, schema,
1220: table));
1221: } catch (SQLException e) {
1222: if (logger.isWarnEnabled())
1223: logger
1224: .warn(
1225: "Error while calling databaseMetaDataGetImportedKeys",
1226: e);
1227: sendToDriver(e);
1228: }
1229: }
1230:
1231: /**
1232: * @see java.sql.DatabaseMetaData#getIndexInfo(java.lang.String,
1233: * java.lang.String, java.lang.String, boolean, boolean)
1234: */
1235: private void databaseMetaDataGetIndexInfo() throws IOException {
1236: if (logger.isDebugEnabled())
1237: logger.debug("databaseMetaDataGetIndexInfo command");
1238:
1239: String catalog = in.readLongUTF();
1240: String schema = in.readLongUTF();
1241: String table = in.readLongUTF();
1242: boolean unique = in.readBoolean();
1243: boolean approximate = in.readBoolean();
1244:
1245: try {
1246: sendToDriver(vdb.getDynamicMetaData().getIndexInfo(
1247: new ConnectionContext(login, transactionStarted,
1248: currentTid, persistentConnection,
1249: persistentConnectionId), catalog, schema,
1250: table, unique, approximate));
1251: } catch (SQLException e) {
1252: if (logger.isWarnEnabled())
1253: logger
1254: .warn(
1255: "Error while calling databaseMetaDataGetIndexInfo",
1256: e);
1257: sendToDriver(e);
1258: }
1259: }
1260:
1261: /**
1262: * @see java.sql.DatabaseMetaData#getPrimaryKeys(java.lang.String,
1263: * java.lang.String, java.lang.String)
1264: */
1265: private void databaseMetaDataGetPrimaryKeys() throws IOException {
1266: if (logger.isDebugEnabled())
1267: logger.debug("DatabaseMetaDataGetPrimaryKeys command");
1268:
1269: String pcatalog = in.readLongUTF();
1270: String pschemaPattern = in.readLongUTF();
1271: String ptableNamePattern = in.readLongUTF();
1272:
1273: try {
1274: sendToDriver(vdb.getDynamicMetaData().getPrimaryKeys(
1275: new ConnectionContext(login, transactionStarted,
1276: currentTid, persistentConnection,
1277: persistentConnectionId), pcatalog,
1278: pschemaPattern, ptableNamePattern));
1279: } catch (SQLException e) {
1280: if (logger.isWarnEnabled())
1281: logger
1282: .warn(
1283: "Error while calling databaseMetaDataGetPrimaryKeys",
1284: e);
1285: sendToDriver(e);
1286: }
1287: }
1288:
1289: /**
1290: * @see java.sql.DatabaseMetaData#getProcedureColumns(java.lang.String,
1291: * java.lang.String, java.lang.String, java.lang.String)
1292: */
1293: private void databaseMetaDataGetProcedureColumns()
1294: throws IOException {
1295: if (logger.isDebugEnabled())
1296: logger.debug("DatabaseMetaDataGetProcedureColumns command");
1297:
1298: String pccatalog = in.readLongUTF();
1299: String pcschemaPattern = in.readLongUTF();
1300: String pcprocedureNamePattern = in.readLongUTF();
1301: String pccolumnNamePattern = in.readLongUTF();
1302:
1303: try {
1304: sendToDriver(vdb.getDynamicMetaData().getProcedureColumns(
1305: new ConnectionContext(login, transactionStarted,
1306: currentTid, persistentConnection,
1307: persistentConnectionId), pccatalog,
1308: pcschemaPattern, pcprocedureNamePattern,
1309: pccolumnNamePattern));
1310: } catch (SQLException e) {
1311: if (logger.isWarnEnabled())
1312: logger
1313: .warn(
1314: "Error while calling databaseMetaDataGetProcedureColumns",
1315: e);
1316: sendToDriver(e);
1317: }
1318: }
1319:
1320: /**
1321: * @see java.sql.DatabaseMetaData#getProcedures(java.lang.String,
1322: * java.lang.String, java.lang.String)
1323: */
1324: private void databaseMetaDataGetProcedures() throws IOException {
1325: if (logger.isDebugEnabled())
1326: logger.debug("DatabaseMetaDataGetProcedures command");
1327:
1328: String rcatalog = in.readLongUTF();
1329: String rschemaPattern = in.readLongUTF();
1330: String procedureNamePattern = in.readLongUTF();
1331:
1332: try {
1333: sendToDriver(vdb.getDynamicMetaData().getProcedures(
1334: new ConnectionContext(login, transactionStarted,
1335: currentTid, persistentConnection,
1336: persistentConnectionId), rcatalog,
1337: rschemaPattern, procedureNamePattern));
1338: } catch (SQLException e) {
1339: if (logger.isWarnEnabled())
1340: logger
1341: .warn(
1342: "Error while calling databaseMetaDataGetProcedures",
1343: e);
1344: sendToDriver(e);
1345: }
1346: }
1347:
1348: /**
1349: * @see java.sql.DatabaseMetaData#getSchemas()
1350: */
1351: private void databaseMetaDataGetSchemas() throws IOException {
1352: if (logger.isDebugEnabled())
1353: logger.debug("DatabaseMetaDataGetSchemas Types command");
1354:
1355: try {
1356: sendToDriver(vdb.getDynamicMetaData().getSchemas(
1357: new ConnectionContext(login, transactionStarted,
1358: currentTid, persistentConnection,
1359: persistentConnectionId)));
1360: } catch (SQLException e) {
1361: if (logger.isWarnEnabled())
1362: logger
1363: .warn(
1364: "Error while calling databaseMetaDataGetSchemas",
1365: e);
1366: sendToDriver(e);
1367: }
1368: }
1369:
1370: /**
1371: * @see java.sql.DatabaseMetaData#getSuperTables(java.lang.String,
1372: * java.lang.String, java.lang.String)
1373: */
1374: private void databaseMetaDataGetSuperTables() throws IOException {
1375: if (logger.isDebugEnabled())
1376: logger.debug("DatabaseMetaDataGetSuperTables command");
1377:
1378: String catalog = in.readLongUTF();
1379: String schemaPattern = in.readLongUTF();
1380: String tableNamePattern = in.readLongUTF();
1381:
1382: try {
1383: sendToDriver(vdb.getDynamicMetaData().getSuperTables(
1384: new ConnectionContext(login, transactionStarted,
1385: currentTid, persistentConnection,
1386: persistentConnectionId), catalog,
1387: schemaPattern, tableNamePattern));
1388: } catch (SQLException e) {
1389: if (logger.isWarnEnabled())
1390: logger
1391: .warn(
1392: "Error while calling databaseMetaDataGetSuperTables",
1393: e);
1394: sendToDriver(e);
1395: }
1396: }
1397:
1398: /**
1399: * @see java.sql.DatabaseMetaData#getSuperTypes(java.lang.String,
1400: * java.lang.String, java.lang.String)
1401: */
1402: private void databaseMetaDataGetSuperTypes() throws IOException {
1403: if (logger.isDebugEnabled())
1404: logger.debug("DatabaseMetaDataGetSuperTables command");
1405:
1406: String catalog = in.readLongUTF();
1407: String schemaPattern = in.readLongUTF();
1408: String tableNamePattern = in.readLongUTF();
1409:
1410: try {
1411: sendToDriver(vdb.getDynamicMetaData().getSuperTypes(
1412: new ConnectionContext(login, transactionStarted,
1413: currentTid, persistentConnection,
1414: persistentConnectionId), catalog,
1415: schemaPattern, tableNamePattern));
1416: } catch (SQLException e) {
1417: if (logger.isWarnEnabled())
1418: logger
1419: .warn(
1420: "Error while calling databaseMetaDataGetSuperTypes",
1421: e);
1422: sendToDriver(e);
1423: }
1424: }
1425:
1426: /**
1427: * @see java.sql.DatabaseMetaData#getTablePrivileges(java.lang.String,
1428: * java.lang.String, java.lang.String)
1429: */
1430: private void databaseMetaDataGetTablePrivileges()
1431: throws IOException {
1432: if (logger.isDebugEnabled())
1433: logger.debug("DatabaseMetaDataGetTablePrivileges command");
1434:
1435: String tpcatalog = in.readLongUTF();
1436: String tpschemaPattern = in.readLongUTF();
1437: String tptablePattern = in.readLongUTF();
1438:
1439: try {
1440: sendToDriver(vdb.getDynamicMetaData().getTablePrivileges(
1441: new ConnectionContext(login, transactionStarted,
1442: currentTid, persistentConnection,
1443: persistentConnectionId), tpcatalog,
1444: tpschemaPattern, tptablePattern));
1445: } catch (SQLException e) {
1446: if (logger.isWarnEnabled())
1447: logger
1448: .warn(
1449: "Error while calling databaseMetaDataGetTablePrivileges",
1450: e);
1451: sendToDriver(e);
1452: }
1453: }
1454:
1455: /**
1456: * @see java.sql.DatabaseMetaData#getTables(java.lang.String,
1457: * java.lang.String, java.lang.String, java.lang.String[])
1458: */
1459: private void databaseMetaDataGetTables() throws IOException {
1460: if (logger.isDebugEnabled())
1461: logger.debug("DatabaseMetaDataGetTables command");
1462:
1463: String tcatalog = in.readLongUTF();
1464: String tschemaPattern = in.readLongUTF();
1465: String ttableNamePattern = in.readLongUTF();
1466:
1467: String[] ttypes = null;
1468: if (in.readBoolean()) {
1469: int size = in.readInt();
1470: ttypes = new String[size];
1471: for (int i = 0; i < size; i++)
1472: ttypes[i] = in.readLongUTF();
1473: }
1474:
1475: try {
1476: sendToDriver(vdb.getDynamicMetaData().getTables(
1477: new ConnectionContext(login, transactionStarted,
1478: currentTid, persistentConnection,
1479: persistentConnectionId), tcatalog,
1480: tschemaPattern, ttableNamePattern, ttypes));
1481: } catch (SQLException e) {
1482: if (logger.isWarnEnabled())
1483: logger
1484: .warn(
1485: "Error while calling databaseMetaDataGetTables",
1486: e);
1487: sendToDriver(e);
1488: }
1489: }
1490:
1491: /**
1492: * @see java.sql.DatabaseMetaData#getTableTypes()
1493: */
1494: private void databaseMetaDataGetTableTypes() throws IOException {
1495: if (logger.isDebugEnabled())
1496: logger.debug("DatabaseMetaDataGetTableTypes command");
1497:
1498: try {
1499: sendToDriver(vdb.getDynamicMetaData().getTableTypes(
1500: new ConnectionContext(login, transactionStarted,
1501: currentTid, persistentConnection,
1502: persistentConnectionId)));
1503: } catch (SQLException e) {
1504: if (logger.isWarnEnabled())
1505: logger
1506: .warn(
1507: "Error while calling databaseMetaDataGetTableTypes",
1508: e);
1509: sendToDriver(e);
1510: }
1511: }
1512:
1513: /**
1514: * @see java.sql.DatabaseMetaData#getTypeInfo()
1515: */
1516: private void databaseMetaDataGetTypeInfo() throws IOException {
1517: if (logger.isDebugEnabled())
1518: logger.debug("DatabaseMetaDataGetTypeInfo command");
1519:
1520: try {
1521: sendToDriver(vdb.getDynamicMetaData().getTypeInfo(
1522: new ConnectionContext(login, transactionStarted,
1523: currentTid, persistentConnection,
1524: persistentConnectionId)));
1525: } catch (SQLException e) {
1526: if (logger.isWarnEnabled())
1527: logger
1528: .warn(
1529: "Error while calling databaseMetaDataGetTypeInfo",
1530: e);
1531: sendToDriver(e);
1532: }
1533: }
1534:
1535: /**
1536: * @see java.sql.DatabaseMetaData#getUDTs(java.lang.String, java.lang.String,
1537: * java.lang.String, int[])
1538: */
1539: private void databaseMetaDataGetUDTs() throws IOException {
1540: if (logger.isDebugEnabled())
1541: logger.debug("DatabaseMetaDataGetUDTs command");
1542:
1543: String catalog = in.readLongUTF();
1544: String schemaPattern = in.readLongUTF();
1545: String tableNamePattern = in.readLongUTF();
1546:
1547: int[] types = null;
1548: if (in.readBoolean()) {
1549: int size = in.readInt();
1550: types = new int[size];
1551: for (int i = 0; i < size; i++)
1552: types[i] = in.readInt();
1553: }
1554:
1555: try {
1556: sendToDriver(vdb.getDynamicMetaData().getUDTs(
1557: new ConnectionContext(login, transactionStarted,
1558: currentTid, persistentConnection,
1559: persistentConnectionId), catalog,
1560: schemaPattern, tableNamePattern, types));
1561: } catch (SQLException e) {
1562: if (logger.isWarnEnabled())
1563: logger.warn(
1564: "Error while calling databaseMetaDataGetUDTs",
1565: e);
1566: sendToDriver(e);
1567: }
1568: }
1569:
1570: /**
1571: * @see java.sql.DatabaseMetaData#getVersionColumns(java.lang.String,
1572: * java.lang.String, java.lang.String)
1573: */
1574: private void databaseMetaDataGetVersionColumns() throws IOException {
1575: if (logger.isDebugEnabled())
1576: logger.debug("DatabaseMetaDataGetVersionColumns command");
1577:
1578: String catalog = in.readLongUTF();
1579: String schema = in.readLongUTF();
1580: String table = in.readLongUTF();
1581:
1582: try {
1583: sendToDriver(vdb.getDynamicMetaData().getVersionColumns(
1584: new ConnectionContext(login, transactionStarted,
1585: currentTid, persistentConnection,
1586: persistentConnectionId), catalog, schema,
1587: table));
1588: } catch (SQLException e) {
1589: if (logger.isWarnEnabled())
1590: logger
1591: .warn(
1592: "Error while calling databaseMetaDataGetVersionColumns",
1593: e);
1594: sendToDriver(e);
1595: }
1596: }
1597:
1598: /**
1599: * Get the static metadata key from the socket and return the corresponding
1600: * metadata.
1601: *
1602: * @throws IOException if an IO error occurs
1603: * @throws NotImplementedException if the underlying metadata access method is
1604: * not implemented
1605: */
1606: private void databaseStaticMetadata() throws IOException,
1607: NotImplementedException {
1608: // the "getXXX(Y,Z,...)" hash key of the metadata
1609: // query called by the client using the driver.
1610: String key = in.readLongUTF();
1611: if (logger.isDebugEnabled())
1612: logger.debug("DatabaseStaticMetadata command for " + key);
1613: MetadataContainer container = vdb.getStaticMetaData()
1614: .getMetadataContainer();
1615: if (container == null) // no metadata has been gathered yet from backends
1616: {
1617: String msg = "No metadata is available probably because no backend is enabled on that controller.";
1618: logger.info(msg);
1619: sendToDriver(new SQLException(msg));
1620: } else {
1621: /**
1622: * To get an exhaustive list of all the types of java objects stored in
1623: * this hash table, search for all callers of
1624: * {@link org.continuent.sequoia.driver.DatabaseMetaData#getMetadata(String, Class[], Object[], boolean)}
1625: * and see also
1626: * {@link org.continuent.sequoia.controller.backend.DatabaseBackendMetaData#retrieveDatabaseMetadata()}
1627: * At this time it's limited to the following types: String, int and
1628: * boolean. boolean is the most frequent.
1629: */
1630: /*
1631: * Since we don't expect that any of these metadata methods will ever
1632: * return a non- java.sql.Types, we re-use here the serialization
1633: * implemented for SQL Data/ResultSets elements.
1634: */
1635:
1636: SQLDataSerialization.Serializer serializer;
1637: Object result = container.get(key);
1638:
1639: try {
1640: serializer = SQLDataSerialization.getSerializer(result);
1641: // TODO: clean-up this.
1642: if (serializer.isUndefined()) // <=> result == null
1643: throw new NotImplementedException();
1644: } catch (NotImplementedException innerEx) { // Should we just print a warning in case result == null ?
1645: // This should never happen with decent drivers.
1646: String msg;
1647: if (null == result)
1648: msg = " returned a null object.";
1649: else
1650: msg = " returned an object of an unsupported java type:"
1651: + result.getClass().getName() + ".";
1652:
1653: NotImplementedException outerEx = new NotImplementedException(
1654: "Backend driver method " + key + msg);
1655: outerEx.initCause(innerEx);
1656: throw outerEx;
1657: }
1658:
1659: TypeTag.NOT_EXCEPTION.sendToStream(out);
1660: serializer.getTypeTag().sendToStream(out);
1661: serializer.sendToStream(result, out);
1662: }
1663:
1664: out.flush();
1665: }
1666:
1667: private void preparedStatementGetMetaData() throws IOException {
1668: if (logger.isDebugEnabled())
1669: logger.debug("PreparedStatementGetMetaData command");
1670:
1671: String sqlTemplate = in.readLongUTF();
1672:
1673: try {
1674: AbstractRequest request = new UnknownWriteRequest(
1675: sqlTemplate, false, 0, "");
1676: request.setIsAutoCommit(!transactionStarted);
1677: setRequestParametersAndTransactionStarted(request);
1678: sendToDriver(vdb.getPreparedStatementGetMetaData(request));
1679: } catch (SQLException e) {
1680: if (logger.isWarnEnabled())
1681: logger
1682: .warn(
1683: "Error while calling databaseMetaDataGetVersionColumns",
1684: e);
1685: sendToDriver(e);
1686: }
1687: }
1688:
1689: private void getControllerVersionNumber() throws IOException {
1690: if (logger.isDebugEnabled())
1691: logger.debug("GetControllerVersionNumber command");
1692:
1693: sendToDriver(Constants.VERSION);
1694: }
1695:
1696: private void getVirtualDatabaseName() throws IOException {
1697: if (logger.isDebugEnabled())
1698: logger.debug("GetVirtualDatabaseName command");
1699:
1700: sendToDriver(vdb.getDatabaseName());
1701: }
1702:
1703: //
1704: // Transaction management
1705: //
1706:
1707: /**
1708: * Check that we did not get a concurrent abort due to deadlock detection.
1709: *
1710: * @param request request that was executing
1711: * @throws SQLException if a concurrent abort has been detected
1712: */
1713: private void checkForConcurrentAbort(AbstractRequest request)
1714: throws SQLException {
1715: if (transactionStarted) {
1716: //
1717: synchronized (this ) {
1718: if (transactionHasAborted) {
1719: /*
1720: * If the transaction was aborted before we execute we would never
1721: * have reached this point and vdb.execWriteRequest(write) would have
1722: * thrown a SQLException. Now we have to force a rollback because we
1723: * have probably lazily re-started the transaction and that has to be
1724: * cleaned up.
1725: */
1726: vdb.rollback(currentTid,
1727: writeQueryExecutedInThisTransaction);
1728: throw new SQLException("Transaction " + currentTid
1729: + " aborted, request " + request
1730: + "failed.");
1731: }
1732: }
1733: }
1734: }
1735:
1736: /**
1737: * Commit the current transaction and reset the transaction state. If
1738: * sendTransactionId is true, the current transaction id is send back to the
1739: * driver else 'true' is sent back. See SEQUOIA-703.
1740: *
1741: * @throws SQLException if an error occurs at commit time
1742: * @throws IOException if an error occurs when sending the value to the driver
1743: */
1744: private void commit() throws SQLException, IOException {
1745: if (logger.isDebugEnabled())
1746: logger.debug("Commit command");
1747:
1748: if (!transactionHasAborted)
1749: vdb.commit(currentTid, writeQueryExecutedInThisTransaction,
1750: !queryExecutedInThisTransaction);
1751: else if (logger.isWarnEnabled()) {
1752: logger.warn("Transaction " + currentTid
1753: + " was aborted by database");
1754: }
1755:
1756: // acknowledged the commit (even if transaction is aborted)
1757: sendToDriver(currentTid);
1758:
1759: resetTransactionState();
1760: }
1761:
1762: private void begin() throws SQLException, IOException {
1763: if (logger.isDebugEnabled())
1764: logger.debug("Begin command");
1765:
1766: currentTid = vdb.begin(login, persistentConnection,
1767: persistentConnectionId);
1768: sendToDriver(currentTid);
1769:
1770: transactionStarted = true;
1771: transactionHasAborted = false;
1772: queryExecutedInThisTransaction = false;
1773: writeQueryExecutedInThisTransaction = false;
1774: hasSavepoint = 0;
1775: }
1776:
1777: /*
1778: * reset transaction State, begin will be initiated by driver
1779: */
1780: private void resetTransactionState() {
1781: currentTid = 0;
1782: transactionStarted = false;
1783: transactionHasAborted = false;
1784: queryExecutedInThisTransaction = false;
1785: writeQueryExecutedInThisTransaction = false;
1786: hasSavepoint = 0;
1787: }
1788:
1789: private void rollback() throws SQLException, IOException {
1790: if (logger.isDebugEnabled())
1791: logger.debug("Rollback command");
1792:
1793: if (!transactionHasAborted)
1794: vdb.rollback(currentTid,
1795: writeQueryExecutedInThisTransaction);
1796: else if (logger.isWarnEnabled()) {
1797: logger.warn("Transaction " + currentTid
1798: + " was aborted by database");
1799: }
1800:
1801: // acknowledged the rollback (even if transaction is aborted)
1802: sendToDriver(currentTid);
1803:
1804: resetTransactionState();
1805: }
1806:
1807: private void setNamedSavepoint() throws SQLException, IOException {
1808: if (logger.isDebugEnabled())
1809: logger.debug("Set named savepoint command");
1810:
1811: String savepointName = in.readLongUTF();
1812:
1813: // Check if this is not a duplicate savepoints
1814: if (vdb.getRequestManager().hasSavepoint(new Long(currentTid),
1815: savepointName))
1816: throw new SQLException("A savepoint named " + savepointName
1817: + " already exists for transaction " + currentTid);
1818:
1819: vdb.setSavepoint(currentTid, savepointName);
1820: writeQueryExecutedInThisTransaction = true;
1821: hasSavepoint++;
1822: sendToDriver(true);
1823: }
1824:
1825: private void setUnnamedSavepoint() throws SQLException, IOException {
1826: if (logger.isDebugEnabled())
1827: logger.debug("Set unnamed savepoint command");
1828:
1829: int savepointId = vdb.setSavepoint(currentTid);
1830: writeQueryExecutedInThisTransaction = true;
1831: hasSavepoint++;
1832: sendToDriver(savepointId);
1833: }
1834:
1835: private void releaseSavepoint() throws SQLException, IOException {
1836: if (logger.isDebugEnabled())
1837: logger.debug("Release savepoint command");
1838: String savepointName = in.readLongUTF();
1839: vdb.releaseSavepoint(currentTid, savepointName);
1840: hasSavepoint--;
1841: sendToDriver(true);
1842: }
1843:
1844: private void rollbackToSavepoint() throws SQLException, IOException {
1845: if (logger.isDebugEnabled())
1846: logger.debug("Rollback to savepoint command");
1847: String savepointName = in.readLongUTF();
1848: vdb.rollback(currentTid, savepointName);
1849: hasSavepoint = vdb
1850: .getNumberOfSavepointsInTransaction(currentTid);
1851: sendToDriver(true);
1852: }
1853:
1854: private void retrieveReleaseSavepoint() throws IOException {
1855: if (logger.isDebugEnabled())
1856: logger.debug("Retrieve release savepoint command");
1857:
1858: // Wait for failover to be authorized
1859: waitForWritesFlushed(currentTid);
1860:
1861: String savepointName = in.readLongUTF();
1862: sendToDriver(!vdb.getRequestManager().hasSavepoint(
1863: new Long(currentTid), savepointName));
1864: }
1865:
1866: //
1867: // Decoding commands from the stream
1868: //
1869:
1870: /**
1871: * Read a request (without ResultSet parameters) send by the
1872: * <code>Connection</code> object.
1873: *
1874: * @return an instance of <code>AbstractRequest</code>
1875: * @throws IOException if an error occurs in the procotol
1876: * @throws BadJDBCApiUsageException if the decoded request does not match
1877: * anything we can handle
1878: * @see Request#Request(DriverBufferedInputStream)
1879: */
1880: private AbstractRequest decodeRequestFromStream()
1881: throws IOException, BadJDBCApiUsageException {
1882: // Get request from the socket
1883: Request driverRequest = new Request(in);
1884:
1885: String sqlQuery = driverRequest.getSqlQueryOrTemplate();
1886:
1887: if (!requestFactory.isAuthorizedRequest(sqlQuery))
1888: throw new BadJDBCApiUsageException(
1889: "The following statement is not authorized to execute on the cluster (check your user documentation): "
1890: + sqlQuery);
1891:
1892: AbstractRequest decodedRequest = requestFactory
1893: .requestFromString(sqlQuery, false, driverRequest
1894: .isEscapeProcessing(), driverRequest
1895: .getTimeoutInSeconds(), connectionLineSeparator);
1896: if (decodedRequest == null)
1897: throw new BadJDBCApiUsageException(
1898: "SQL statement does not match a query returning an update count ("
1899: + sqlQuery + ")");
1900:
1901: decodedRequest.setPreparedStatementParameters(driverRequest
1902: .getPreparedStatementParameters());
1903: decodedRequest.setIsAutoCommit(driverRequest.isAutoCommit());
1904: return decodedRequest;
1905: }
1906:
1907: /**
1908: * Read a request with ResultSet parameters send by the
1909: * <code>Connection</code> object.
1910: *
1911: * @param isExecuteQuery set to true if the received query is probably a read
1912: * statement (i.e. called by an executeQuery-like statement). This
1913: * will give priority to the parsing of read requests.
1914: * @return an instance of <code>AbstractRequest</code>
1915: * @throws IOException if an error occurs in the procotol
1916: * @throws BadJDBCApiUsageException if the request is not authorized to
1917: * execute
1918: * @see RequestWithResultSetParameters#RequestWithResultSetParameters(DriverBufferedInputStream)
1919: */
1920: private AbstractRequest decodeRequestWithResultSetParametersFromStream(
1921: boolean isExecuteQuery) throws IOException,
1922: BadJDBCApiUsageException {
1923: RequestWithResultSetParameters driverRequest = new RequestWithResultSetParameters(
1924: in);
1925:
1926: String sqlQuery = driverRequest.getSqlQueryOrTemplate();
1927:
1928: if (!requestFactory.isAuthorizedRequest(sqlQuery))
1929: throw new BadJDBCApiUsageException(
1930: "The following statement is not authorized to execute on the cluster (check your user documentation): "
1931: + sqlQuery);
1932:
1933: AbstractRequest decodedRequest = requestFactory
1934: .requestFromString(sqlQuery, isExecuteQuery,
1935: driverRequest.isEscapeProcessing(),
1936: driverRequest.getTimeoutInSeconds(),
1937: connectionLineSeparator);
1938: if (decodedRequest == null) {
1939: decodedRequest = new UnknownWriteRequest(sqlQuery,
1940: driverRequest.isEscapeProcessing(), driverRequest
1941: .getTimeoutInSeconds(),
1942: connectionLineSeparator);
1943: }
1944: decodedRequest.setPreparedStatementParameters(driverRequest
1945: .getPreparedStatementParameters());
1946: decodedRequest.setIsAutoCommit(driverRequest.isAutoCommit());
1947: decodedRequest.setMaxRows(driverRequest.getMaxRows());
1948: decodedRequest.setFetchSize(driverRequest.getFetchSize());
1949: decodedRequest.setCursorName(driverRequest.getCursorName());
1950: return decodedRequest;
1951: }
1952:
1953: /**
1954: * Log a transaction begin if needed for the AbstractRequest.<br />
1955: * The transaction is started only if needed (if the request is the first
1956: * write request for the current transaction)
1957: *
1958: * @param request a request
1959: * @throws SQLException if the transaction has aborted
1960: */
1961: private synchronized void logTransactionBegin(
1962: AbstractRequest request) throws SQLException {
1963: transactionStarted = setRequestParameters(request, login,
1964: currentTid, transactionStarted);
1965:
1966: if (connectionHasClosed)
1967: throw new SQLException(
1968: "Persistent connection is closed, cannot execute query "
1969: + request);
1970:
1971: if (transactionHasAborted)
1972: throw new SQLException(
1973: "Transaction is aborted, cannot execute query "
1974: + request);
1975:
1976: if (!transactionStarted)
1977: currentTid = 0;
1978: else {
1979: // Transaction not started, check if we should do a lazy start
1980: queryExecutedInThisTransaction = true;
1981: writeQueryExecutedInThisTransaction = true;
1982: }
1983: }
1984:
1985: /**
1986: * Set the login and transaction id on the given request. If the request is
1987: * autocommit and a transaction was started, the transaction is first commited
1988: * to return in autocommit mode.
1989: *
1990: * @param request The request to set
1991: * @param login user login to set
1992: * @param tid the transaction id to set
1993: * @return new value of transaction started
1994: */
1995: private boolean setRequestParameters(AbstractRequest request,
1996: String login, long tid, boolean transactionStarted)
1997: throws SQLException {
1998: request.setClientIpAddress(clientIpAddress);
1999: request.setLogin(login);
2000: request.setTransactionIsolation(transactionIsolation);
2001: request.setLineSeparator(connectionLineSeparator);
2002: request.setPersistentConnection(persistentConnection);
2003: request.setPersistentConnectionId(persistentConnectionId);
2004: request.setRetrieveSQLWarnings(retrieveSQLWarnings);
2005: request.setIsReadOnly(isReadOnly);
2006: if (request.isAutoCommit() && transactionStarted) {
2007: vdb.commit(tid, writeQueryExecutedInThisTransaction,
2008: !queryExecutedInThisTransaction);
2009: return false;
2010: } else
2011: request.setTransactionId(tid);
2012: request.setId(vdb.getNextRequestId());
2013: return transactionStarted;
2014: }
2015:
2016: private void setRequestParametersAndTransactionStarted(
2017: AbstractRequest request) throws SQLException {
2018: synchronized (this ) {
2019: transactionStarted = setRequestParameters(request, login,
2020: currentTid, transactionStarted);
2021:
2022: if (connectionHasClosed)
2023: throw new SQLException(
2024: "Persistent connection is closed, cannot execute query "
2025: + request);
2026:
2027: if (transactionHasAborted)
2028: throw new SQLException(
2029: "Transaction is aborted, cannot execute query "
2030: + request);
2031:
2032: if (!transactionStarted)
2033: currentTid = 0;
2034: else
2035: queryExecutedInThisTransaction = true;
2036: }
2037: }
2038:
2039: //
2040: // Request execution
2041: //
2042:
2043: private void statementExecuteQuery(SelectRequest decodedRequest)
2044: throws IOException, SQLException, BadJDBCApiUsageException {
2045: if (logger.isDebugEnabled())
2046: logger.debug("StatementExecuteQuery command");
2047: AbstractRequest request = decodedRequest;
2048: if (decodedRequest == null)
2049: request = decodeRequestWithResultSetParametersFromStream(true);
2050:
2051: if (request instanceof SelectRequest) {
2052: SelectRequest select = (SelectRequest) request;
2053: setRequestParametersAndTransactionStarted(select);
2054:
2055: // Here, if the transaction isolation level was set to SERIALIZABLE, we
2056: // need to broadcast the request to all controllers
2057: if (!request.isAutoCommit()
2058: && requestFactory
2059: .isBroadcastRequired(transactionIsolation)) {
2060: select.setMustBroadcast(true);
2061: writeQueryExecutedInThisTransaction = true;
2062: }
2063:
2064: // send the resultset
2065: ControllerResultSet crs = vdb.statementExecuteQuery(select);
2066:
2067: checkForConcurrentAbort(select);
2068:
2069: // If this is a remapping of the call, we have to send the id back
2070: if (decodedRequest != null)
2071: sendToDriver(select.getId());
2072:
2073: // send statement warnings
2074: sendToDriver(crs.getStatementWarnings());
2075:
2076: sendToDriver(crs);
2077:
2078: // streaming
2079: if (crs.hasMoreData())
2080: streamedResultSets.put(crs.getCursorName(), crs);
2081: } else if (request instanceof StoredProcedure) { // This is a stored procedure
2082: if (logger.isInfoEnabled())
2083: logger
2084: .info("Statement.executeQuery() detected a stored procedure ("
2085: + request
2086: + ") remapping the call to CallableStatement.executeQuery()");
2087: callableStatementExecuteQuery((StoredProcedure) request,
2088: false);
2089: return;
2090: } else
2091: throw new BadJDBCApiUsageException(
2092: "Statement.executeQuery() not allowed for requests returning an update count ("
2093: + request + ")");
2094: }
2095:
2096: /**
2097: * Execute a write request that returns an int.
2098: *
2099: * @param decodedRequest an already decoded request or null
2100: * @throws IOException if an error occurs with the socket
2101: * @throws SQLException if an error occurs while executing the request
2102: * @throws BadJDBCApiUsageException if a query returning a ResultSet is called
2103: */
2104: private void statementExecuteUpdate(
2105: AbstractWriteRequest decodedRequest) throws IOException,
2106: SQLException, BadJDBCApiUsageException {
2107: if (logger.isDebugEnabled())
2108: logger.debug("StatementExecuteUpdate command");
2109:
2110: AbstractRequest request = decodedRequest;
2111: if (request == null) {
2112: try {
2113: request = decodeRequestFromStream();
2114: } catch (BadJDBCApiUsageException e) {
2115: throw new BadJDBCApiUsageException(
2116: "Statement.executeUpdate() not allowed for requests returning a ResultSet",
2117: e);
2118: }
2119: logTransactionBegin(request);
2120: }
2121:
2122: try {
2123: AbstractWriteRequest write = (AbstractWriteRequest) request;
2124:
2125: // At this point we don't have a stored procedure
2126: // Send query id to driver for failover
2127: sendToDriver(request.getId());
2128:
2129: // Execute the request
2130: ExecuteUpdateResult result = vdb
2131: .statementExecuteUpdate(write);
2132: // Check if there was an issue with deadlock detection
2133: checkForConcurrentAbort(write);
2134: // Send SQL Warnings if any
2135: sendToDriver(result.getStatementWarnings());
2136: // Send result back
2137: sendToDriver(result.getUpdateCount());
2138: } catch (ClassCastException e) {
2139: if (request instanceof StoredProcedure) {
2140: if (logger.isInfoEnabled())
2141: logger
2142: .info("Statement.executeUpdate() detected a stored procedure ("
2143: + request
2144: + ") remapping the call to CallableStatement.executeUpdate()");
2145: callableStatementExecuteUpdate(
2146: (StoredProcedure) request, false);
2147: return;
2148: } else
2149: throw new BadJDBCApiUsageException(
2150: "Statement.executeUpdate() not allowed for requests returning a ResultSet ("
2151: + request + ")");
2152: }
2153: }
2154:
2155: private void statementExecuteUpdateWithKeys() throws IOException,
2156: SQLException, BadJDBCApiUsageException {
2157: if (logger.isDebugEnabled())
2158: logger.debug("StatementExecuteUpdateWithKeys command");
2159: try {
2160: // Get the request from the socket
2161: AbstractWriteRequest writeWithKeys;
2162: try {
2163: writeWithKeys = (AbstractWriteRequest) decodeRequestFromStream();
2164: } catch (BadJDBCApiUsageException e) {
2165: throw new BadJDBCApiUsageException(
2166: "Statement.executeUpdate() not allowed for requests returning a ResultSet",
2167: e);
2168: }
2169: logTransactionBegin(writeWithKeys);
2170:
2171: // Send query id to driver for failover
2172: sendToDriver(writeWithKeys.getId());
2173:
2174: // Execute the request
2175: GeneratedKeysResult updateCountWithKeys = vdb
2176: .statementExecuteUpdateWithKeys(writeWithKeys);
2177: // Check if there was an issue with deadlock detection
2178: checkForConcurrentAbort(writeWithKeys);
2179: // Send SQL Warnings if any
2180: sendToDriver(updateCountWithKeys.getStatementWarnings());
2181: // Send result back
2182: sendToDriver(updateCountWithKeys.getUpdateCount());
2183: ControllerResultSet rs = updateCountWithKeys
2184: .getControllerResultSet();
2185: sendToDriver(rs);
2186:
2187: // streaming
2188: if (rs.hasMoreData())
2189: streamedResultSets.put(rs.getCursorName(),
2190: updateCountWithKeys);
2191: } catch (ClassCastException e) {
2192: throw new BadJDBCApiUsageException(
2193: "RETURN_GENERATED_KEYS is not supported for stored procedures");
2194: }
2195:
2196: }
2197:
2198: private void statementExecute(AbstractRequest decodedRequest)
2199: throws IOException, SQLException {
2200: if (logger.isDebugEnabled())
2201: logger.debug("statementExecute command");
2202:
2203: AbstractRequest request = decodedRequest;
2204: if (decodedRequest == null)
2205: try {
2206: request = decodeRequestWithResultSetParametersFromStream(false);
2207: } catch (BadJDBCApiUsageException e) {
2208: throw new SQLException(e.getMessage());
2209: }
2210:
2211: synchronized (this ) {
2212: transactionStarted = setRequestParameters(request, login,
2213: currentTid, transactionStarted);
2214:
2215: if (connectionHasClosed)
2216: throw new SQLException(
2217: "Persistent connection is closed, cannot execute query "
2218: + request);
2219:
2220: if (transactionHasAborted)
2221: throw new SQLException(
2222: "Transaction is aborted, cannot execute query "
2223: + request);
2224:
2225: if (!transactionStarted)
2226: currentTid = 0;
2227: else
2228: queryExecutedInThisTransaction = true;
2229: }
2230:
2231: ExecuteResult result;
2232: // Direct to Statement.execute() if this is an inline batch
2233: // or if matching some statements (such as EXPLAIN ANALYZE in postgres)
2234: if (requestFactory.requestNeedsExecute(request)) {
2235: // Send query id to driver for failover
2236: sendToDriver(request.getId());
2237:
2238: writeQueryExecutedInThisTransaction = true;
2239:
2240: if (request instanceof SelectRequest) { // Convert to an unknown write request as expected by underlying
2241: // components (relates to SEQUOIA-674)
2242: UnknownWriteRequest writeRequest = new UnknownWriteRequest(
2243: request.getSqlOrTemplate(), request
2244: .getEscapeProcessing(), request
2245: .getTimeout(), request
2246: .getLineSeparator());
2247: writeRequest.setIsAutoCommit(request.isAutoCommit());
2248: writeRequest.setTransactionId(request
2249: .getTransactionId());
2250: writeRequest.setTransactionIsolation(request
2251: .getTransactionIsolation());
2252: writeRequest.setId(request.getId());
2253: writeRequest.setLogin(request.getLogin());
2254: writeRequest.setPreparedStatementParameters(request
2255: .getPreparedStatementParameters());
2256: writeRequest.setTimeout(request.getTimeout());
2257: writeRequest.setMaxRows(request.getMaxRows());
2258: writeRequest.setPersistentConnection(request
2259: .isPersistentConnection());
2260: writeRequest.setPersistentConnectionId(request
2261: .getPersistentConnectionId());
2262: request = writeRequest;
2263: }
2264:
2265: result = vdb.statementExecute(request);
2266: }
2267: // Route to CallableStatement.execute() if this is a stored procedure
2268: else if (request instanceof StoredProcedure) {
2269: if (logger.isInfoEnabled())
2270: logger
2271: .info("Statement.execute() did detect a stored procedure ("
2272: + request
2273: + ") remapping the call to CallableStatement.execute()");
2274:
2275: writeQueryExecutedInThisTransaction = true;
2276:
2277: callableStatementExecute((StoredProcedure) request, false);
2278: return;
2279: } else { // Route SELECT to Statement.executeQuery() and others to
2280: // Statement.executeUpdate()
2281:
2282: // Send query id to driver for failover (driver still expect a
2283: // Statement.execute() protocol)
2284: sendToDriver(request.getId());
2285:
2286: result = new ExecuteResult();
2287: if (request instanceof SelectRequest) {
2288: // Here, if the transaction isolation level was set to SERIALIZABLE, we
2289: // need to broadcast the select request to all controllers
2290: if (!request.isAutoCommit()
2291: && requestFactory
2292: .isBroadcastRequired(transactionIsolation)) {
2293: ((SelectRequest) request).setMustBroadcast(true);
2294: writeQueryExecutedInThisTransaction = true;
2295: }
2296:
2297: ControllerResultSet crs = vdb
2298: .statementExecuteQuery((SelectRequest) request);
2299: // call remapping: construct a ExecuteResult from a ExecuteUpdateResult
2300: result.addResult(crs);
2301: result.setStatementWarnings(crs.getStatementWarnings());
2302: result.addResult(-1);
2303: // streaming
2304: if (crs.hasMoreData())
2305: streamedResultSets.put(crs.getCursorName(), crs);
2306: } else {
2307: writeQueryExecutedInThisTransaction = true;
2308:
2309: ExecuteUpdateResult updateCount = vdb
2310: .statementExecuteUpdate((AbstractWriteRequest) request);
2311: // call remapping: construct a ExecuteResult from a ExecuteUpdateResult
2312: result.setStatementWarnings(updateCount
2313: .getStatementWarnings());
2314: result.addResult(updateCount.getUpdateCount());
2315: // end of result list marker
2316: if (updateCount.getUpdateCount() != -1)
2317: result.addResult(-1);
2318: }
2319: }
2320:
2321: checkForConcurrentAbort(request);
2322:
2323: // Send SQL Warnings if any
2324: sendToDriver(result.getStatementWarnings());
2325:
2326: for (Iterator iter = result.getResults().iterator(); iter
2327: .hasNext();) {
2328: Object r = iter.next();
2329: if (r instanceof Integer) {
2330: sendToDriver(false);
2331: sendToDriver(((Integer) r).intValue());
2332: } else if (r instanceof ControllerResultSet) {
2333: sendToDriver(true);
2334: sendToDriver((ControllerResultSet) r);
2335: } else
2336: logger
2337: .error("Unexpected result " + r
2338: + " in statementExecute for request "
2339: + request);
2340: }
2341: }
2342:
2343: /**
2344: * @param decodedProc Stored procedure if called from statementExecuteQuery(),
2345: * otherwise null
2346: * @param returnsOutParameters true if the call must return out/named
2347: * parameters
2348: * @throws BadJDBCApiUsageException
2349: */
2350: private void callableStatementExecuteQuery(
2351: StoredProcedure decodedProc, boolean returnsOutParameters)
2352: throws IOException, SQLException, BadJDBCApiUsageException {
2353: if (logger.isDebugEnabled())
2354: logger.debug("CallableStatementExecuteQuery command");
2355:
2356: StoredProcedure proc = decodedProc;
2357: if (proc == null) {
2358: AbstractRequest request = decodeRequestWithResultSetParametersFromStream(true);
2359: if (request == null)
2360: throw new ProtocolException(
2361: "Failed to decode stored procedure");
2362:
2363: try {
2364: // Fetch the query from the socket
2365: proc = (StoredProcedure) request;
2366:
2367: // Parse the query first to update the semantic information
2368: vdb.getRequestManager()
2369: .getParsingFromCacheOrParse(proc);
2370:
2371: // If procedure is read-only, we don't log lazy begin
2372: DatabaseProcedureSemantic semantic = proc.getSemantic();
2373: if ((semantic == null) || !semantic.isReadOnly())
2374: logTransactionBegin(proc);
2375: } catch (ClassCastException e) {
2376: if (request instanceof SelectRequest) {
2377: if (logger.isInfoEnabled())
2378: logger
2379: .info("CallableStatement.executeQuery() did not detect a stored procedure ("
2380: + request
2381: + ") remapping the call to Statement.executeQuery()");
2382: statementExecuteQuery((SelectRequest) request);
2383: if (returnsOutParameters)
2384: sendNamedAndOutParametersToDriver(request);
2385: return;
2386: }
2387: throw new BadJDBCApiUsageException(
2388: "Unhandled stored procedure call in " + request);
2389: }
2390: }
2391:
2392: setRequestParametersAndTransactionStarted(proc);
2393:
2394: if (decodedProc == null) { // Send query id to driver for failover
2395: sendToDriver(proc.getId());
2396: }
2397: // else we come from statement.executeQuery and we should not send the id
2398:
2399: // Execute the stored procedure
2400: ControllerResultSet sprs = vdb
2401: .callableStatementExecuteQuery(proc);
2402: checkForConcurrentAbort(proc);
2403:
2404: // Send SQL Warnings if any
2405: sendToDriver(sprs.getStatementWarnings());
2406:
2407: sendToDriver(sprs);
2408:
2409: // streaming
2410: if (sprs.hasMoreData())
2411: streamedResultSets.put(sprs.getCursorName(), sprs);
2412:
2413: if (returnsOutParameters)
2414: sendNamedAndOutParametersToDriver(proc);
2415: }
2416:
2417: /**
2418: * @param sp Stored procedure if called from statementExecuteUpdate(),
2419: * otherwise null
2420: * @param returnsOutParameters true if the call must return out/named
2421: * parameters
2422: */
2423: private void callableStatementExecuteUpdate(StoredProcedure sp,
2424: boolean returnsOutParameters) throws IOException,
2425: SQLException, BadJDBCApiUsageException {
2426: if (logger.isDebugEnabled())
2427: logger.debug("CallableStatementExecuteUpdate command");
2428:
2429: if (sp == null) {
2430: AbstractRequest request;
2431: try {
2432: request = decodeRequestFromStream();
2433: } catch (BadJDBCApiUsageException e) {
2434: throw new BadJDBCApiUsageException(
2435: "CallableStatement.executeUpdate() not allowed for requests returning a ResultSet ",
2436: e);
2437: }
2438: logTransactionBegin(request);
2439:
2440: try {
2441: // Fetch the query from the socket
2442: sp = (StoredProcedure) request;
2443: } catch (ClassCastException e) {
2444: if (request instanceof AbstractWriteRequest) {
2445: if (logger.isInfoEnabled())
2446: logger
2447: .info("CallableStatement.executeUpdate() did not detect a stored procedure ("
2448: + request
2449: + ") remapping the call to Statement.executeUpdate()");
2450: statementExecuteUpdate((AbstractWriteRequest) request);
2451: if (returnsOutParameters)
2452: sendNamedAndOutParametersToDriver(request);
2453: return;
2454: }
2455: throw new BadJDBCApiUsageException(
2456: "Unhandled stored procedure call in " + request);
2457: }
2458: }
2459:
2460: // Send query id to driver for failover
2461: sendToDriver(sp.getId());
2462:
2463: // Execute the query
2464: ExecuteUpdateResult result = vdb
2465: .callableStatementExecuteUpdate(sp);
2466: checkForConcurrentAbort(sp);
2467: // Send SQL Warnings if any
2468: sendToDriver(result.getStatementWarnings());
2469: // Send result back
2470: sendToDriver(result.getUpdateCount());
2471:
2472: if (returnsOutParameters)
2473: sendNamedAndOutParametersToDriver(sp);
2474: }
2475:
2476: /**
2477: * @param sp Stored procedure if called from statementExecute(), otherwise
2478: * null.
2479: * @param returnsOutParameters true if the call must return out/named
2480: * parameters
2481: */
2482: private void callableStatementExecute(StoredProcedure sp,
2483: boolean returnsOutParameters) throws IOException,
2484: SQLException {
2485: if (logger.isDebugEnabled())
2486: logger.debug("CallableStatementExecute command");
2487:
2488: if (sp == null) {
2489: AbstractRequest request;
2490: try {
2491: request = decodeRequestWithResultSetParametersFromStream(false);
2492: } catch (BadJDBCApiUsageException e) {
2493: throw new SQLException(e.getMessage());
2494: }
2495: if (request == null)
2496: throw new ProtocolException(
2497: "Failed to decode stored procedure");
2498: try {
2499: // Fetch the query from the socket
2500: sp = (StoredProcedure) request;
2501:
2502: // Parse the query first to update the semantic information
2503: vdb.getRequestManager().getParsingFromCacheOrParse(sp);
2504:
2505: // If procedure is read-only, we don't log lazy begin
2506: DatabaseProcedureSemantic semantic = sp.getSemantic();
2507: if ((semantic == null) || !semantic.isReadOnly())
2508: logTransactionBegin(sp);
2509: } catch (ClassCastException e) {
2510: if (logger.isInfoEnabled())
2511: logger
2512: .info("CallableStatement.execute() did not detect a stored procedure ("
2513: + request
2514: + ") remapping the call to Statement.execute()");
2515: statementExecute(request);
2516: if (returnsOutParameters)
2517: sendNamedAndOutParametersToDriver(request);
2518: return;
2519: }
2520: }
2521:
2522: setRequestParametersAndTransactionStarted(sp);
2523:
2524: // Send query id to driver for failover
2525: sendToDriver(sp.getId());
2526:
2527: // Execute the query
2528: ExecuteResult result = vdb.callableStatementExecute(sp);
2529: checkForConcurrentAbort(sp);
2530:
2531: // Send SQL Warnings if any
2532: sendToDriver(result.getStatementWarnings());
2533:
2534: for (Iterator iter = result.getResults().iterator(); iter
2535: .hasNext();) {
2536: Object r = iter.next();
2537:
2538: if (r instanceof Integer) {
2539: sendToDriver(false);
2540: sendToDriver(((Integer) r).intValue());
2541: } else if (r instanceof ControllerResultSet) {
2542: sendToDriver(true);
2543: sendToDriver((ControllerResultSet) r);
2544: } else
2545: logger.error("Unexepected result " + r
2546: + " in callableStatementExecute for request "
2547: + sp);
2548: }
2549: if (returnsOutParameters)
2550: sendNamedAndOutParametersToDriver(sp);
2551: }
2552:
2553: private void sendNamedAndOutParametersToDriver(
2554: AbstractRequest request) throws IOException,
2555: ProtocolException {
2556: if (request instanceof StoredProcedure) {
2557: try {
2558: StoredProcedure proc = (StoredProcedure) request;
2559: // First send the out parameters
2560: List outParamIndexes = proc.getOutParameterIndexes();
2561: if (outParamIndexes != null) {
2562: // Now send each param (index, then serializer and serialized object)
2563: for (Iterator iter = outParamIndexes.iterator(); iter
2564: .hasNext();) {
2565: Integer index = (Integer) iter.next();
2566: sendToDriver(index.intValue());
2567: Object object = proc
2568: .getOutParameterValue(index);
2569: sendObjectToDriver(object);
2570: }
2571: }
2572: sendToDriver(0);
2573:
2574: // Fetch the named parameters
2575: List namedParamNames = proc.getNamedParameterNames();
2576: if (namedParamNames != null) {
2577: for (Iterator iter = namedParamNames.iterator(); iter
2578: .hasNext();) {
2579: // Send param name first
2580: String paramName = (String) iter.next();
2581: sendToDriver(paramName);
2582: // Now send value (serializer first then serialized object)
2583: Object object = proc
2584: .getNamedParameterValue(paramName);
2585: sendObjectToDriver(object);
2586: }
2587: }
2588: sendToDriver("0");
2589: } catch (NotImplementedException e) {
2590: String msg = "Unable to serialize parameter result for request "
2591: + request;
2592: logger.error(msg, e);
2593: throw new ProtocolException(msg);
2594: }
2595: } else
2596: // Not a stored procedure (remapped call)
2597: {
2598: // No out parameter
2599: sendToDriver(0);
2600: // No named parameter
2601: sendToDriver("0");
2602: }
2603: }
2604:
2605: /**
2606: * Send an object to the driver (first tag then serialized object).
2607: *
2608: * @param object object to send
2609: * @throws IOException if an error occurs with the socket
2610: * @throws NotImplementedException if the object cannot be serialized
2611: */
2612: private void sendObjectToDriver(Object object) throws IOException,
2613: NotImplementedException {
2614: if (object == null) { // Special tag for null objects (nothing to send)
2615: TypeTag.JAVA_NULL.sendToStream(out);
2616: } else { // Regular object
2617: Serializer s = SQLDataSerialization.getSerializer(object);
2618: s.getTypeTag().sendToStream(out);
2619: s.sendToStream(object, out);
2620: }
2621: }
2622:
2623: /**
2624: * Retrieve the result from the request result failover cache for the given
2625: * request id. If the result is not found, the scheduler is checked in case
2626: * the query is currently executing. If the query is executing, we wait until
2627: * it has completed and then return the result. If no result is found, null is
2628: * returned.
2629: *
2630: * @param requestId the request unique identifier
2631: * @return the request result or null if not found
2632: */
2633: private Serializable getResultForRequestId(long requestId) {
2634: waitForWritesFlushed(requestId);
2635:
2636: Serializable result = ((DistributedVirtualDatabase) vdb)
2637: .getRequestResultFailoverCache().retrieve(requestId);
2638:
2639: if (result == null) { // Check if query is not currently executing
2640: AbstractScheduler scheduler = vdb.getRequestManager()
2641: .getScheduler();
2642: if (scheduler.isActiveRequest(requestId)) {
2643: // Wait for request completion and then retrieve the result from the
2644: // failover cache.
2645: scheduler.waitForRequestCompletion(requestId);
2646: result = ((DistributedVirtualDatabase) vdb)
2647: .getRequestResultFailoverCache().retrieve(
2648: requestId);
2649: }
2650: }
2651: return result;
2652: }
2653:
2654: /**
2655: * Retrieve the result of a stored procedure that returns multiple results,
2656: * out parameters and named parameters. Returns null to the driver if the
2657: * result has not been found.
2658: *
2659: * @throws IOException if an error occurs with the socket
2660: * @throws SQLException if an error occurs while retrieving the result
2661: */
2662: private void retrieveExecuteResultWithParameters()
2663: throws IOException, SQLException {
2664: if (logger.isDebugEnabled())
2665: logger
2666: .debug("Retrieve execute result with parameters command");
2667:
2668: long requestId = in.readLong();
2669:
2670: if (vdb.isDistributed()) {
2671: Serializable result = getResultForRequestId(requestId);
2672:
2673: if (result != null) {
2674: // Cache hit
2675: if (result instanceof StoredProcedureCallResult) {
2676: StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2677:
2678: // re-send statement warnings
2679: sendToDriver(((ExecuteResult) spResult.getResult())
2680: .getStatementWarnings());
2681: // Send results first
2682: for (Iterator iter = ((ExecuteResult) spResult
2683: .getResult()).getResults().iterator(); iter
2684: .hasNext();) {
2685: Object element = iter.next();
2686: if (element instanceof Integer) {
2687: sendToDriver(false);
2688: sendToDriver(((Integer) element).intValue());
2689: } else if (element instanceof ControllerResultSet) {
2690: sendToDriver(true);
2691: sendToDriver((ControllerResultSet) element);
2692: } else
2693: logger
2694: .error("Unexpected result "
2695: + element
2696: + " in statementExecute for request "
2697: + requestId);
2698: }
2699:
2700: // Send parameters
2701: sendNamedAndOutParametersToDriver(spResult
2702: .getStoredProcedure());
2703: } else
2704: throw new SQLException(
2705: "Expected StoredProcedureCallResult for request "
2706: + requestId + " failover but got "
2707: + result);
2708: } else {
2709: // No cache hit
2710: sendToDriver((SQLWarning) null);
2711: sendToDriver(true);
2712: sendToDriver((ControllerResultSet) null);
2713: }
2714: } else {
2715: throw new SQLException(
2716: "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2717: }
2718: }
2719:
2720: /**
2721: * Retrieve the result of a stored procedure that returns an int, out
2722: * parameters and named parameters. Returns -1 to the driver if the result has
2723: * not been found.
2724: *
2725: * @throws IOException if an error occurs with the socket
2726: * @throws SQLException if an error occurs while retrieving the result
2727: */
2728: private void retrieveExecuteUpdateResultWithParameters()
2729: throws IOException, SQLException {
2730: if (logger.isDebugEnabled())
2731: logger
2732: .debug("Retrieve execute update with parameters command");
2733:
2734: long requestId = in.readLong();
2735:
2736: if (vdb.isDistributed()) {
2737: Serializable result = getResultForRequestId(requestId);
2738:
2739: if (result != null) {
2740: // Cache hit
2741: if (result instanceof StoredProcedureCallResult) {
2742: StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2743: // Send warnings
2744: ExecuteUpdateResult r = (ExecuteUpdateResult) spResult
2745: .getResult();
2746: // re-send statement warnings
2747: sendToDriver(r.getStatementWarnings());
2748: // Send udpate count
2749: sendToDriver(r.getUpdateCount());
2750: // Send parameters
2751: sendNamedAndOutParametersToDriver(spResult
2752: .getStoredProcedure());
2753: } else
2754: throw new SQLException(
2755: "Expected StoredProcedureCallResult for request "
2756: + requestId + " failover but got "
2757: + result);
2758: } else {
2759: // No cache hit
2760: sendToDriver((SQLWarning) null);
2761: sendToDriver(-1);
2762: }
2763: } else {
2764: throw new SQLException(
2765: "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2766: }
2767: }
2768:
2769: /**
2770: * Retrieve the result of a stored procedure that returns a ResultSet, out
2771: * parameters and named parameters. Returns null to the driver if the result
2772: * has not been found.
2773: *
2774: * @throws IOException if an error occurs with the socket
2775: * @throws SQLException if an error occurs while retrieving the result
2776: */
2777: private void retrieveExecuteQueryResultWithParameters()
2778: throws IOException, SQLException {
2779: if (logger.isDebugEnabled())
2780: logger
2781: .debug("Retrieve execute update with parameters command");
2782:
2783: long requestId = in.readLong();
2784:
2785: if (vdb.isDistributed()) {
2786: Serializable result = getResultForRequestId(requestId);
2787:
2788: if (result != null) {
2789: // Cache hit
2790: if (result instanceof StoredProcedureCallResult) {
2791: StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2792: // re-send statement warnings
2793: sendToDriver(((ControllerResultSet) spResult
2794: .getResult()).getStatementWarnings());
2795: // Send ResultSet
2796: sendToDriver((ControllerResultSet) spResult
2797: .getResult());
2798: // Send parameters
2799: sendNamedAndOutParametersToDriver(spResult
2800: .getStoredProcedure());
2801: } else
2802: throw new SQLException(
2803: "Expected StoredProcedureCallResult for request "
2804: + requestId + " failover but got "
2805: + result);
2806: } else {
2807: // No cache hit
2808: sendToDriver((SQLWarning) null);
2809: sendToDriver((ControllerResultSet) null);
2810: }
2811: } else {
2812: throw new SQLException(
2813: "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2814: }
2815: }
2816:
2817: /**
2818: * Retrieve the result of a write request that returns an int. Returns -1 to
2819: * the driver if the result has not been found.
2820: *
2821: * @throws IOException if an error occurs with the socket
2822: * @throws SQLException if an error occurs while retrieving the result
2823: */
2824: private void retrieveExecuteUpdateResult() throws IOException,
2825: SQLException {
2826: if (logger.isDebugEnabled())
2827: logger.debug("Retrieve execute update result command");
2828:
2829: long requestId = in.readLong();
2830:
2831: if (vdb.isDistributed()) {
2832: // Result will always be null since result is only stored in the recovery
2833: // log but this call ensures that the query execution has completed.
2834: getResultForRequestId(requestId);
2835:
2836: // We don't cache the warnings for write queries
2837: sendToDriver(new SQLWarning(
2838: Translate
2839: .get(
2840: "virtualdatabase.distributed.write.failover.lost.warnings", requestId))); //$NON-NLS-1$
2841: sendToDriver(vdb.getRequestManager().getRecoveryLog()
2842: .getUpdateCountResultForQuery(requestId));
2843: } else {
2844: throw new SQLException(
2845: "Transparent failover for statements that return an update count is only supported in distributed configurations.");
2846: }
2847: }
2848:
2849: /**
2850: * Retrieve the result of a request that returns multiple results. Returns
2851: * null to the driver if the result has not been found.
2852: *
2853: * @throws IOException if an error occurs with the socket
2854: * @throws SQLException if an error occurs while retrieving the result
2855: */
2856: private void retrieveExecuteResult() throws IOException,
2857: SQLException {
2858: if (logger.isDebugEnabled())
2859: logger.debug("Retrieve execute result command");
2860:
2861: long requestId = in.readLong();
2862:
2863: if (vdb.isDistributed()) {
2864: Serializable result = getResultForRequestId(requestId);
2865:
2866: if (result != null) {
2867: // Cache hit
2868: // re-send warnings
2869: SQLWarning cachedWarns = ((ExecuteResult) result)
2870: .getStatementWarnings();
2871: sendToDriver(cachedWarns);
2872: // and result
2873: for (Iterator iter = ((ExecuteResult) result)
2874: .getResults().iterator(); iter.hasNext();) {
2875: Object element = iter.next();
2876: if (element instanceof Integer) {
2877: sendToDriver(false);
2878: sendToDriver(((Integer) element).intValue());
2879: } else if (element instanceof ControllerResultSet) {
2880: sendToDriver(true);
2881: sendToDriver((ControllerResultSet) element);
2882: } else
2883: logger.error("Unexpected result " + element
2884: + " in statementExecute for request "
2885: + requestId);
2886: }
2887: } else {
2888: try {
2889: // The query may have been remapped
2890: int updateCount = vdb.getRequestManager()
2891: .getRecoveryLog()
2892: .getUpdateCountResultForQuery(requestId);
2893: sendToDriver((SQLWarning) null);
2894: if (updateCount != -1) { // Build a response with the update count and add 'no more result'
2895: sendToDriver(false);
2896: sendToDriver(updateCount);
2897: sendToDriver(false);
2898: sendToDriver(-1);
2899: } else { // Not found
2900: sendToDriver(true);
2901: sendToDriver((ControllerResultSet) null);
2902: }
2903: } catch (SQLException ex) { // No cache hit
2904: sendToDriver((SQLWarning) null);
2905: sendToDriver(true);
2906: sendToDriver((ControllerResultSet) null);
2907: }
2908: }
2909: } else {
2910: throw new SQLException(
2911: "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2912: }
2913: }
2914:
2915: /**
2916: * Retrieve the result of a write request that returns an int and generated
2917: * keys. Returns -1 to the driver if the result has not been found.
2918: *
2919: * @throws IOException if an error occurs with the socket
2920: * @throws SQLException if an error occurs while retrieving the result
2921: */
2922: private void retrieveExecuteUpdateWithKeysResult()
2923: throws IOException, SQLException {
2924: if (logger.isDebugEnabled())
2925: logger
2926: .debug("Retrieve execute update with keys result command");
2927:
2928: long requestId = in.readLong();
2929:
2930: if (vdb.isDistributed()) {
2931: Serializable result = getResultForRequestId(requestId);
2932:
2933: if (result != null) {
2934: // Cache hit
2935: sendToDriver(((GeneratedKeysResult) result)
2936: .getStatementWarnings());
2937: sendToDriver(((GeneratedKeysResult) result)
2938: .getUpdateCount());
2939: sendToDriver(((GeneratedKeysResult) result)
2940: .getControllerResultSet());
2941: } else {
2942: // No cache hit
2943: sendToDriver((SQLWarning) null);
2944: sendToDriver(-1);
2945: }
2946: } else {
2947: throw new SQLException(
2948: "Transparent failover for statements that return generated keys is only supported in distributed configurations.");
2949: }
2950: }
2951:
2952: /**
2953: * Retrieve the result of a request that returns a ResultSet. Returns null to
2954: * the driver if the result has not been found.
2955: *
2956: * @throws IOException if an error occurs with the socket
2957: * @throws SQLException if an error occurs while retrieving the result
2958: */
2959: private void retrieveExecuteQueryResult() throws IOException,
2960: SQLException {
2961: if (logger.isDebugEnabled())
2962: logger.debug("Retrieve execute query result command");
2963:
2964: long requestId = in.readLong();
2965:
2966: if (vdb.isDistributed()) {
2967: Serializable result = getResultForRequestId(requestId);
2968:
2969: if (result != null) {
2970: // Cache hit
2971: sendToDriver(((ControllerResultSet) result)
2972: .getStatementWarnings());
2973: sendToDriver((ControllerResultSet) result);
2974: } else {
2975: // No cache hit
2976: sendToDriver((SQLWarning) null);
2977: sendToDriver((ControllerResultSet) null);
2978: }
2979: } else {
2980: throw new SQLException(
2981: "Transparent failover for statements that return a ResultSet is only supported in distributed configurations.");
2982: }
2983: }
2984:
2985: /**
2986: * Retrieve the result of a transaction commit. If the transaction was not
2987: * commited, we commit it and acknowledge the driver back in all cases.
2988: *
2989: * @throws IOException if an error occurs with the socket
2990: * @throws SQLException if an error occurs while retrieving the result
2991: */
2992: private void retrieveCommitResult() throws IOException,
2993: SQLException {
2994: if (logger.isDebugEnabled())
2995: logger.debug("Retrieve commit command");
2996:
2997: waitForWritesFlushed(currentTid);
2998:
2999: if (transactionHasAborted) {
3000: if (logger.isWarnEnabled()) {
3001: logger.warn("Transaction " + currentTid
3002: + " was aborted by database");
3003: }
3004: // mimic the behavior of the commit() method
3005: sendToDriver(currentTid);
3006: return;
3007: }
3008:
3009: boolean retry;
3010: do {
3011: retry = false;
3012: String commitStatus = vdb.getRequestManager()
3013: .getRecoveryLog().getCommitStatusForTransaction(
3014: currentTid);
3015:
3016: if (LogEntry.MISSING.equals(commitStatus)) {
3017: if (writeQueryExecutedInThisTransaction) {
3018: // Transaction was not commited yet, let's commit
3019: commit();
3020: } else {
3021: // If this was a read-only transaction, it was never started on this
3022: // controller and therefore there is no need to commit. It is ok to
3023: // let the user believe that the transaction successfully commited
3024: // since it was read-only and had no effect on the database anyway.
3025: // We have to tell the client that the transaction was committed
3026: // successfully anyway, otherwise the application will hang waiting
3027: // for this answer.
3028: sendToDriver(currentTid);
3029: resetTransactionState();
3030: return;
3031: }
3032: } else if (LogEntry.SUCCESS.equals(commitStatus)) {
3033: // Transaction was already commited, acknowledge the transaction id
3034: sendToDriver(currentTid);
3035:
3036: resetTransactionState();
3037: } else if (LogEntry.FAILED.equals(commitStatus)) {
3038: logger.warn("Commit of transaction " + currentTid
3039: + " failed");
3040: // commit failed
3041: throw new SQLException("Commit of transaction "
3042: + currentTid + " failed");
3043: } else {
3044: /*
3045: * Status is executing or unknown, if status is executing and we have
3046: * enabled backends locally, we have to wait for the final status to be
3047: * updated in the recovery log
3048: */
3049:
3050: retry = LogEntry.EXECUTING.equals(commitStatus)
3051: && (vdb.getRequestManager().getLoadBalancer()
3052: .getNumberOfEnabledBackends() > 0);
3053: if (!retry)
3054: throw new SQLException("Commit of transaction "
3055: + currentTid
3056: + " is in unknown or executing state");
3057: }
3058: } while (retry);
3059:
3060: }
3061:
3062: /**
3063: * Retrieve the result of a transaction rollback. If the transaction was not
3064: * rollbacked, we rollback and acknowledge the driver back in all cases.
3065: *
3066: * @throws IOException if an error occurs with the socket
3067: * @throws SQLException if an error occurs while retrieving the result
3068: */
3069: private void retrieveRollbackResult() throws IOException,
3070: SQLException {
3071: if (logger.isDebugEnabled())
3072: logger.debug("Retrieve rollback command");
3073:
3074: waitForWritesFlushed(currentTid);
3075:
3076: if (!transactionHasAborted) {
3077: String rollbackStatus = vdb.getRequestManager()
3078: .getRecoveryLog().getRollbackStatusForTransaction(
3079: currentTid);
3080:
3081: if (LogEntry.MISSING.equals(rollbackStatus))
3082: // Transaction was not rollbacked yet, let's do it
3083: rollback();
3084: else if (LogEntry.SUCCESS.equals(rollbackStatus)
3085: || LogEntry.FAILED.equals(rollbackStatus)) {
3086: // Transaction was already rollbacked, acknowledge the transaction id
3087: sendToDriver(currentTid);
3088: resetTransactionState();
3089: } else { // UNKOWN OR EXECUTING state
3090: if (vdb.isDistributed()) {
3091: ((DistributedRequestManager) vdb
3092: .getRequestManager())
3093: .cleanupRollbackFromOtherController(currentTid);
3094: sendToDriver(currentTid);
3095: resetTransactionState();
3096: } else {
3097: // rollback cannot fail locally, so notify right away even if the
3098: // rollback has not fully completed
3099: sendToDriver(currentTid);
3100: resetTransactionState();
3101: }
3102: }
3103: } else {
3104: if (logger.isWarnEnabled()) {
3105: logger.warn("Transaction " + currentTid
3106: + " was aborted by database");
3107: }
3108: // mimic the behavior of the rollback() method
3109: sendToDriver(currentTid);
3110: }
3111: }
3112:
3113: private void waitForWritesFlushed(long requestIdOrTransactionId) {
3114: // In non-distributed configuration, there is no failover cache, so there
3115: // is no need to wait
3116: if (!vdb.isDistributed())
3117: return;
3118:
3119: DistributedVirtualDatabase dvdb = (DistributedVirtualDatabase) vdb;
3120: dvdb
3121: .flushGroupCommunicationMessagesLocally(requestIdOrTransactionId
3122: & DistributedRequestManager.CONTROLLER_ID_BIT_MASK);
3123: dvdb
3124: .waitForGroupCommunicationMessagesLocallyFlushed(requestIdOrTransactionId
3125: & DistributedRequestManager.CONTROLLER_ID_BIT_MASK);
3126: }
3127:
3128: /**
3129: * Serialize a ControllerResultSet answer, prefixed with the appropriate
3130: * TypeTag. Note that this will be deserialized in a DriverResultSet.
3131: *
3132: * @param crs the resultset to send
3133: * @throws IOException stream error
3134: */
3135: private void sendToDriver(ControllerResultSet crs)
3136: throws IOException {
3137: /**
3138: * If a (buggy) backend was returning a null ResultSet, we would have failed
3139: * much earlier in
3140: * {@link ControllerResultSet#ControllerResultSet(AbstractRequest, java.sql.ResultSet, MetadataCache, Statement, boolean)
3141: */
3142: /*
3143: * So we can safely use "null" as a special value during transparent
3144: * failover when the controller's request cache hasn't found a result for a
3145: * given request.
3146: */
3147: if (null == crs) {
3148: TypeTag.NULL_RESULTSET.sendToStream(out);
3149: out.flush();
3150: return;
3151: }
3152:
3153: try {
3154: crs.initSerializers();
3155: } catch (NotImplementedException nie) { // we don't know how to serialize something
3156: sendToDriver(nie);
3157: return;
3158: }
3159:
3160: TypeTag.RESULTSET.sendToStream(out);
3161: crs.sendToStream(out);
3162: }
3163:
3164: /**
3165: * Send a protocol String, prefixed with the appropriate TypeTag
3166: */
3167: private void sendToDriver(String str) throws IOException {
3168: TypeTag.NOT_EXCEPTION.sendToStream(out);
3169: out.writeLongUTF(str);
3170: out.flush();
3171: }
3172:
3173: /**
3174: * Send a protocol boolean, prefixed with the appropriate TypeTag
3175: */
3176: private void sendToDriver(boolean b) throws IOException {
3177: TypeTag.NOT_EXCEPTION.sendToStream(out);
3178: out.writeBoolean(b);
3179: out.flush();
3180: }
3181:
3182: /**
3183: * Send a protocol int, prefixed with the appropriate TypeTag
3184: */
3185: private void sendToDriver(int i) throws IOException {
3186: TypeTag.NOT_EXCEPTION.sendToStream(out);
3187: out.writeInt(i);
3188: out.flush();
3189: }
3190:
3191: /**
3192: * Send a protocol long, prefixed with the appropriate TypeTag
3193: */
3194: private void sendToDriver(long l) throws IOException {
3195: TypeTag.NOT_EXCEPTION.sendToStream(out);
3196: out.writeLong(l);
3197: out.flush();
3198: }
3199:
3200: private void sendToDriver(SQLWarning s) throws IOException {
3201: if (s != null) {
3202: sendToDriver(true);
3203: TypeTag.BACKEND_EXCEPTION.sendToStream(out);
3204: new BackendDriverException(s).sendToStream(out);
3205: } else
3206: sendToDriver(false);
3207: }
3208:
3209: private void sendToDriver(Exception e) throws IOException {
3210: TypeTag.EXCEPTION.sendToStream(out);
3211: // This is the place where we convert Exceptions to something
3212: // serializable and that the driver can understand
3213: // So this is the place where it's possible to trap all unknown exceptions
3214:
3215: if (e instanceof SQLException) { // we assume that an SQLexception comes from the backend
3216:
3217: // since this is currently false because some ControllerCoreExceptions
3218: // subclass SQLException, here are a few workarounds
3219: if (e instanceof NoMoreBackendException
3220: || e instanceof NoMoreControllerException
3221: || e instanceof NotImplementedException) {
3222: TypeTag.CORE_EXCEPTION.sendToStream(out);
3223: new ControllerCoreException(e).sendToStream(out);
3224: return;
3225: }
3226:
3227: // non-workaround, regular SQLException from backend
3228: TypeTag.BACKEND_EXCEPTION.sendToStream(out);
3229: new BackendDriverException(e).sendToStream(out);
3230: return;
3231: }
3232:
3233: // else we assume this is an exception from the core (currently...?)
3234: TypeTag.CORE_EXCEPTION.sendToStream(out);
3235: new ControllerCoreException(e).sendToStream(out);
3236: return;
3237:
3238: }
3239:
3240: /**
3241: * Implements streaming: send the next ResultSet chunk to driver, pulling it
3242: * from ControllerResultSet. The driver decides of the chunk size at each
3243: * call. Note that virtualdatabase streaming is independent from backend
3244: * streaming (which may not be supported). They even could be configured with
3245: * two different fetchSize -s (it's not currently the case).
3246: * <p>
3247: * This is a real issue: in case of a low fetchsize hint ignored by the driver
3248: * of the backend, then the whole backend resultset stays in the memory on the
3249: * controller. And we probably cannot know how many rows did it pulled out.
3250: *
3251: * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#executeStatementExecuteQueryOnBackend(SelectRequest,
3252: * org.continuent.sequoia.controller.backend.DatabaseBackend,
3253: * org.continuent.sequoia.controller.backend.BackendWorkerThread,
3254: * Connection, org.continuent.sequoia.controller.backend.MetadataCache)
3255: * @see ControllerResultSet#fetchData(int)
3256: */
3257: private void fetchNextResultSetRows() throws IOException,
3258: SQLException {
3259: if (logger.isDebugEnabled())
3260: logger.debug("FetchNextResultSetRows command");
3261:
3262: String cursorName = in.readLongUTF();
3263: int fetchSize = in.readInt();
3264: ControllerResultSet crs = (ControllerResultSet) streamedResultSets
3265: .get(cursorName);
3266: if (crs == null) {
3267: sendToDriver(new SQLException(
3268: "No valid ControllerResultSet to fetch data from"));
3269: } else {
3270: // refresh ControllerResultSet with a new chunk of rows
3271: crs.fetchData(fetchSize);
3272:
3273: // send it
3274: TypeTag.NOT_EXCEPTION.sendToStream(out);
3275: crs.sendRowsToStream(out);
3276:
3277: // At this point we could probably data.clear() already sent as a memory
3278: // optimization, but still in doubt about others using it we leave it as
3279: // is.
3280:
3281: if (!crs.hasMoreData())
3282: streamedResultSets.remove(cursorName);
3283: }
3284: }
3285:
3286: //
3287: // Public API
3288: //
3289:
3290: /**
3291: * Return the current transaction id (should be 0 if not in a transaction).
3292: *
3293: * @return the current transaction id
3294: */
3295: public long getCurrentTransactionId() {
3296: return currentTid;
3297: }
3298:
3299: /**
3300: * Return the persistent connection id.
3301: *
3302: * @return the persistent connection id
3303: */
3304: public long getPersistentConnectionId() {
3305: return persistentConnectionId;
3306: }
3307:
3308: /**
3309: * Get time active
3310: *
3311: * @return time active since started
3312: */
3313: public long getTimeActive() {
3314: return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
3315: }
3316:
3317: /**
3318: * @return Returns the login of the current user.
3319: */
3320: public String getUser() {
3321: if (user == null) {
3322: return "No user connected";
3323: }
3324: return user.getLogin();
3325: }
3326:
3327: //
3328: // Public API
3329: //
3330:
3331: /**
3332: * Notify the abort of the given transaction which should match the current
3333: * transaction id of this thread else an exception will be thrown.
3334: *
3335: * @param tid the transaction identifier to abort
3336: * @throws SQLException if the tid does not correspond to the current
3337: * transaction id of this thread or if the abort throws a
3338: * SQLException
3339: */
3340: public void notifyAbort(long tid) throws SQLException {
3341: synchronized (this ) {
3342: if ((!transactionStarted) || (currentTid != tid))
3343: throw new SQLException(
3344: "Cannot abort transaction "
3345: + tid
3346: + " since current worker thread is assigned to transaction "
3347: + currentTid);
3348:
3349: transactionHasAborted = true;
3350: }
3351: }
3352:
3353: /**
3354: * Notify the closing of the given persistent connection which should match
3355: * the current persistent connection id of this thread else an exception will
3356: * be thrown.
3357: *
3358: * @param tid the persistent connection identifier to abort
3359: */
3360: public void notifyClose(long persistentConnectionId) {
3361: synchronized (this ) {
3362: if ((!persistentConnection)
3363: || (this .persistentConnectionId != persistentConnectionId))
3364: logger
3365: .warn("Cannot notify closing of persistent connection "
3366: + persistentConnectionId
3367: + " since current worker thread is assigned to persistent connection "
3368: + this .persistentConnectionId);
3369:
3370: connectionHasClosed = true;
3371: }
3372: }
3373:
3374: /**
3375: * Retrieve general information on this client
3376: *
3377: * @return an array of string
3378: */
3379: public String[] retrieveClientData() {
3380: String[] data = new String[4];
3381: data[0] = in.getSocket().getInetAddress().getHostName();
3382: data[1] = in.getSocket().getInetAddress().getHostAddress();
3383: data[2] = String.valueOf(((System.currentTimeMillis() - in
3384: .getDateCreated()) / 1000));
3385: return data;
3386: }
3387:
3388: /**
3389: * Shutdown this thread by setting <code>isKilled</code> value to true. This
3390: * gives time to check for needed rollback transactions
3391: */
3392: public void shutdown() {
3393: // Tell this thread to stop working gently.
3394: // This will cancel transaction if needed
3395: this .isKilled = true;
3396: try {
3397: if (waitForCommand) {
3398: // close only the streams if we're not in the middle of a request
3399: in.close();
3400: out.close();
3401: }
3402: } catch (IOException e) {
3403: // ignore, only the input stream should be close
3404: // for this thread to end
3405: }
3406: }
3407:
3408: }
|