Source Code Cross Referenced for VirtualDatabaseWorkerThread.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » virtualdatabase » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.virtualdatabase 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2005-2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         * 
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         * 
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         * 
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License. 
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet.
0022:         * Contributor(s): Nicolas Modrzyk, Jean-Bernard van Zuylen, Damian Arregui.
0023:         * Refactored by Marc Herbert to remove the use of Java serialization.
0024:         */package org.continuent.sequoia.controller.virtualdatabase;
0025:
0026:        import java.io.EOFException;
0027:        import java.io.IOException;
0028:        import java.io.Serializable;
0029:        import java.net.SocketException;
0030:        import java.sql.SQLException;
0031:        import java.sql.SQLWarning;
0032:        import java.util.ArrayList;
0033:        import java.util.HashMap;
0034:        import java.util.Iterator;
0035:        import java.util.List;
0036:
0037:        import org.continuent.sequoia.common.exceptions.BadJDBCApiUsageException;
0038:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0039:        import org.continuent.sequoia.common.exceptions.NoMoreControllerException;
0040:        import org.continuent.sequoia.common.exceptions.NotImplementedException;
0041:        import org.continuent.sequoia.common.exceptions.ProtocolException;
0042:        import org.continuent.sequoia.common.exceptions.VDBisShuttingDownException;
0043:        import org.continuent.sequoia.common.exceptions.driver.protocol.BackendDriverException;
0044:        import org.continuent.sequoia.common.exceptions.driver.protocol.ControllerCoreException;
0045:        import org.continuent.sequoia.common.i18n.Translate;
0046:        import org.continuent.sequoia.common.log.Trace;
0047:        import org.continuent.sequoia.common.protocol.Commands;
0048:        import org.continuent.sequoia.common.protocol.SQLDataSerialization;
0049:        import org.continuent.sequoia.common.protocol.TypeTag;
0050:        import org.continuent.sequoia.common.protocol.SQLDataSerialization.Serializer;
0051:        import org.continuent.sequoia.common.sql.Request;
0052:        import org.continuent.sequoia.common.sql.RequestWithResultSetParameters;
0053:        import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
0054:        import org.continuent.sequoia.common.sql.metadata.MetadataDescription;
0055:        import org.continuent.sequoia.common.sql.schema.DatabaseProcedureSemantic;
0056:        import org.continuent.sequoia.common.stream.DriverBufferedInputStream;
0057:        import org.continuent.sequoia.common.stream.DriverBufferedOutputStream;
0058:        import org.continuent.sequoia.common.users.VirtualDatabaseUser;
0059:        import org.continuent.sequoia.common.util.Constants;
0060:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0061:        import org.continuent.sequoia.controller.backend.result.ExecuteResult;
0062:        import org.continuent.sequoia.controller.backend.result.ExecuteUpdateResult;
0063:        import org.continuent.sequoia.controller.backend.result.GeneratedKeysResult;
0064:        import org.continuent.sequoia.controller.core.Controller;
0065:        import org.continuent.sequoia.controller.core.ControllerConstants;
0066:        import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0067:        import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
0068:        import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0069:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0070:        import org.continuent.sequoia.controller.requests.AbstractWriteRequest;
0071:        import org.continuent.sequoia.controller.requests.RequestFactory;
0072:        import org.continuent.sequoia.controller.requests.SelectRequest;
0073:        import org.continuent.sequoia.controller.requests.StoredProcedure;
0074:        import org.continuent.sequoia.controller.requests.StoredProcedureCallResult;
0075:        import org.continuent.sequoia.controller.requests.UnknownWriteRequest;
0076:        import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
0077:        import org.continuent.sequoia.driver.Connection;
0078:
0079:        /**
0080:         * This class handles a connection with a Sequoia driver.
0081:         * 
0082:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0083:         * @author <a href="mailto:Nicolas.Modrzyk@inria.fr">Nicolas Modrzyk </a>
0084:         * @author <a href="mailto:Marc.Herbert@emicnetworks.com">Marc Herbert </a>
0085:         * @author <a href="mailto:jbvanzuylen@transwide.com">Jean-Bernard van Zuylen
0086:         *         </a>
0087:         * @author <a href="mailto:damian.arregui@continuent.com">Damian Arregui
0088:         * @version 2.0
0089:         */
0090:        public class VirtualDatabaseWorkerThread extends Thread {
0091:            //
0092:            // How the code is organized?
0093:            //
0094:            // 1. Member variables
0095:            // 2. Constructor(s)
0096:            // 3. Request management
0097:            // 4. Getter/Setters
0098:
0099:            /** <code>true</code> if this has been killed. */
0100:            private boolean isKilled = false;
0101:
0102:            /** Virtual database instantiating this thread. */
0103:            private VirtualDatabase vdb;
0104:
0105:            /** Logger instance. */
0106:            private Trace logger = null;
0107:
0108:            private DriverBufferedInputStream in = null;
0109:            private DriverBufferedOutputStream out = null;
0110:
0111:            private VirtualDatabaseUser user;
0112:
0113:            private Controller controller;
0114:
0115:            private boolean waitForCommand;
0116:
0117:            private HashMap streamedResultSets;
0118:
0119:            private RequestFactory requestFactory = ControllerConstants.CONTROLLER_FACTORY
0120:                    .getRequestFactory();
0121:            /**
0122:             * The following variables represent the state of the connection with the
0123:             * client
0124:             */
0125:            private boolean persistentConnection;
0126:            private long persistentConnectionId;
0127:            private boolean connectionHasClosed;
0128:            private boolean retrieveSQLWarnings;
0129:            private long currentTid;
0130:            private boolean transactionStarted;
0131:            private boolean transactionHasAborted;
0132:            private boolean queryExecutedInThisTransaction;
0133:            private boolean writeQueryExecutedInThisTransaction;
0134:            // Number of savepoints in the current transaction
0135:            private int hasSavepoint;
0136:            private String clientIpAddress;
0137:            private String login;
0138:            private boolean closed;
0139:            private int transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
0140:            private boolean isReadOnly = false;
0141:            private String connectionLineSeparator = null;
0142:
0143:            /* end user logger */
0144:            static Trace endUserLogger = Trace
0145:                    .getLogger("org.continuent.sequoia.enduser");
0146:
0147:            /*
0148:             * Constructor
0149:             */
0150:
0151:            /**
0152:             * Creates a new <code>VirtualDatabaseWorkerThread</code> instance.
0153:             * 
0154:             * @param controller the thread was originated from
0155:             * @param vdb the virtual database instantiating this thread.
0156:             */
0157:            public VirtualDatabaseWorkerThread(Controller controller,
0158:                    VirtualDatabase vdb) {
0159:                super ("VirtualDatabaseWorkerThread-"
0160:                        + vdb.getVirtualDatabaseName());
0161:                this .vdb = vdb;
0162:                this .controller = controller;
0163:                try {
0164:                    this .logger = Trace
0165:                            .getLogger("org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread."
0166:                                    + vdb.getVirtualDatabaseName());
0167:                } catch (Exception e) {
0168:                    this .logger = vdb.logger;
0169:                }
0170:            }
0171:
0172:            //
0173:            // Decoding commands from the stream
0174:            //
0175:
0176:            /**
0177:             * Gets a connection from the connection queue and process it.
0178:             */
0179:            public void run() {
0180:                ArrayList vdbActiveThreads = vdb.getActiveThreads();
0181:                ArrayList vdbPendingQueue = vdb.getPendingConnections();
0182:                // List of open ResultSets for streaming. This is not synchronized since the
0183:                // connection does only handle one request at a time
0184:                streamedResultSets = new HashMap();
0185:                boolean isActive = true;
0186:
0187:                if (vdbActiveThreads == null) {
0188:                    logger
0189:                            .error("Got null active threads queue in VirtualDatabaseWorkerThread");
0190:                    isKilled = true;
0191:                }
0192:                if (vdbPendingQueue == null) {
0193:                    logger
0194:                            .error("Got null connection queue in VirtualDatabaseWorkerThread");
0195:                    isKilled = true;
0196:                }
0197:
0198:                // Main loop
0199:                while (!isKilled) {
0200:                    // Get a connection from the pending queue
0201:                    synchronized (vdbPendingQueue) {
0202:                        while (vdbPendingQueue.isEmpty()) {
0203:                            if (!vdb.poolConnectionThreads) { // User does not want thread pooling, kill this thread!
0204:                                isKilled = true;
0205:                                break;
0206:                            }
0207:                            boolean timeout = false;
0208:                            try {
0209:                                if (isActive) {
0210:                                    isActive = false;
0211:                                    // Remove ourselves from the active thread list
0212:                                    synchronized (vdbActiveThreads) {
0213:                                        vdbActiveThreads.remove(this );
0214:                                        vdb.incrementIdleThreadCount();
0215:                                    }
0216:                                }
0217:                                long before = System.currentTimeMillis();
0218:                                vdbPendingQueue
0219:                                        .wait(vdb.getMaxThreadIdleTime());
0220:                                long now = System.currentTimeMillis();
0221:                                // Check if timeout has expired
0222:                                timeout = now - before >= vdb
0223:                                        .getMaxThreadIdleTime();
0224:                            } catch (InterruptedException e) {
0225:                                logger
0226:                                        .warn("VirtualDatabaseWorkerThread wait() interrupted");
0227:                            }
0228:                            if (timeout && vdbPendingQueue.isEmpty()) {
0229:                                if (vdb.currentNbOfThreads > vdb.minNbOfThreads) { // We have enough threads, kill this one
0230:                                    isKilled = true;
0231:                                    break;
0232:                                }
0233:                            }
0234:                        }
0235:
0236:                        if (isKilled) { // Cleaning up
0237:                            synchronized (vdbActiveThreads) { // Remove ourselves from the appropriate thread list
0238:                                if (isActive) {
0239:                                    vdbActiveThreads.remove(this );
0240:                                    vdb.decreaseCurrentNbOfThread();
0241:                                } else
0242:                                    vdb.decreaseIdleThread();
0243:                            }
0244:                            // Get out of the while loop
0245:                            continue;
0246:                        }
0247:
0248:                        // Get a connection
0249:                        try {
0250:                            in = (DriverBufferedInputStream) vdbPendingQueue
0251:                                    .remove(0);
0252:                            out = (DriverBufferedOutputStream) vdbPendingQueue
0253:                                    .remove(0);
0254:                        } catch (Exception e) {
0255:                            logger
0256:                                    .error("Error while getting streams from connection");
0257:                            continue;
0258:                        }
0259:
0260:                        synchronized (vdbActiveThreads) {
0261:                            if (!isActive) {
0262:                                vdb.decreaseIdleThread();
0263:                                isActive = true;
0264:                                // Add this thread to the active thread list
0265:                                vdbActiveThreads.add(this );
0266:                            }
0267:                        }
0268:                    }
0269:
0270:                    closed = false;
0271:
0272:                    // Handle connection
0273:                    // Read the user information and check authentication
0274:                    /**
0275:                     * @see org.continuent.sequoia.driver.Driver#connectToController(Properties,
0276:                     *      SequoiaUrl, ControllerInfo)
0277:                     */
0278:                    boolean success = false;
0279:                    try {
0280:                        login = in.readLongUTF();
0281:                        String password = in.readLongUTF();
0282:                        user = new VirtualDatabaseUser(login, password);
0283:
0284:                        // Pre-check for transparent login
0285:                        if (vdb.getAuthenticationManager()
0286:                                .isTransparentLoginEnabled()) {
0287:                            if (!vdb.getAuthenticationManager()
0288:                                    .isValidVirtualUser(user)) {
0289:                                vdb.checkAndAddVirtualDatabaseUser(user);
0290:                            }
0291:                        }
0292:
0293:                        if (vdb.getAuthenticationManager().isValidVirtualUser(
0294:                                user)) { // Authentication ok
0295:                            out.writeBoolean(true); // success code
0296:                            out.flush();
0297:                            success = true;
0298:                            try {
0299:                                clientIpAddress = in.getSocket()
0300:                                        .getInetAddress().toString();
0301:                            } catch (NullPointerException e) // no method above throws anything
0302:                            {
0303:                                clientIpAddress = "Unable to fetch client address";
0304:                            }
0305:
0306:                            if (logger.isDebugEnabled())
0307:                                logger.debug("Login accepted for " + login
0308:                                        + " from " + clientIpAddress);
0309:
0310:                            connectionLineSeparator = in.readLongUTF();
0311:                            persistentConnection = in.readBoolean();
0312:                            if (persistentConnection) {
0313:                                persistentConnectionId = vdb
0314:                                        .getNextConnectionId();
0315:                                try {
0316:                                    vdb.openPersistentConnection(login,
0317:                                            persistentConnectionId);
0318:                                    out.writeBoolean(true);
0319:                                    out.writeLong(persistentConnectionId);
0320:                                    out.flush();
0321:                                } catch (SQLException e) {
0322:                                    success = false;
0323:                                    out.writeBoolean(false);
0324:                                    out.flush();
0325:                                    continue;
0326:                                }
0327:                            }
0328:                            retrieveSQLWarnings = in.readBoolean();
0329:                        } else { // Authentication failed, close the connection
0330:                            String msg = "Authentication failed for user '"
0331:                                    + login + "'";
0332:                            out.writeBoolean(false); // authentication failed
0333:                            out.writeLongUTF(msg); // error message
0334:                            if (logger.isDebugEnabled())
0335:                                logger.debug(msg);
0336:                            endUserLogger.error(Translate.get(
0337:                                    "virtualdatabase.authentication.failed",
0338:                                    login));
0339:                            continue;
0340:                        }
0341:                    } catch (IOException e) {
0342:                        logger.error("I/O error during user authentication ("
0343:                                + e + ")");
0344:                        closed = true;
0345:                    } finally {
0346:                        if (!success) {
0347:                            try {
0348:                                out.close();
0349:                                in.close();
0350:                            } catch (IOException ignore) {
0351:                            }
0352:                        }
0353:                    }
0354:
0355:                    currentTid = 0;
0356:                    connectionHasClosed = false;
0357:                    transactionStarted = false;
0358:                    transactionHasAborted = false;
0359:                    transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
0360:                    queryExecutedInThisTransaction = false;
0361:                    writeQueryExecutedInThisTransaction = false;
0362:                    hasSavepoint = 0;
0363:
0364:                    int command = -1;
0365:                    while (!closed && !isKilled) {
0366:                        try {
0367:                            // Get the query
0368:                            waitForCommand = true;
0369:                            out.writeInt(Commands.ControllerPrompt);
0370:                            out.flush();
0371:                            if (Commands.CommandPrefix != in.readInt()) {
0372:                                logger.error("Protocol corruption with client "
0373:                                        + login + ", last command was:"
0374:                                        + command + ". Closing.");
0375:                                // FIXME: because of the protocol corruption, this has very little
0376:                                // chance to actually close the connection. We need something more
0377:                                // rude here, like shutting down ourselves.
0378:                                command = Commands.Close;
0379:                            } else {
0380:                                try {
0381:                                    command = in.readInt();
0382:                                } catch (NullPointerException e) {
0383:                                    // SEQUOIA-777: this NPE happens when this thread is blocked
0384:                                    // on in.read() and 'in' gets closed either explicitly
0385:                                    // by the shutdown thread or by an unexpected client socket death.
0386:                                    // The NPE is a known issue with jdk < 1.5, see SEQUOIA-777.
0387:                                    // In the shutdown case, the flag isKilled is set
0388:                                    // and we can exit the while loop. Otherwize, the client socket
0389:                                    // died unexpectedly and we consider this is a CLOSE.
0390:                                    if (isKilled)
0391:                                        continue;
0392:                                    logger
0393:                                            .warn("Client unexpectedly dropped connection. Closing.");
0394:                                    command = Commands.Close;
0395:                                }
0396:                            }
0397:
0398:                            waitForCommand = false;
0399:
0400:                            // Process it
0401:                            switch (command) {
0402:                            case Commands.StatementExecuteQuery:
0403:                                statementExecuteQuery(null);
0404:                                break;
0405:                            case Commands.StatementExecuteUpdate:
0406:                                statementExecuteUpdate(null);
0407:                                break;
0408:                            case Commands.StatementExecuteUpdateWithKeys:
0409:                                statementExecuteUpdateWithKeys();
0410:                                break;
0411:                            case Commands.CallableStatementExecuteQuery:
0412:                                callableStatementExecuteQuery(null, false);
0413:                                break;
0414:                            case Commands.CallableStatementExecuteUpdate:
0415:                                callableStatementExecuteUpdate(null, false);
0416:                                break;
0417:                            case Commands.CallableStatementExecute:
0418:                                callableStatementExecute(null, false);
0419:                                break;
0420:                            case Commands.CallableStatementExecuteQueryWithParameters:
0421:                                callableStatementExecuteQuery(null, true);
0422:                                break;
0423:                            case Commands.CallableStatementExecuteUpdateWithParameters:
0424:                                callableStatementExecuteUpdate(null, true);
0425:                                break;
0426:                            case Commands.CallableStatementExecuteWithParameters:
0427:                                callableStatementExecute(null, true);
0428:                                break;
0429:                            case Commands.StatementExecute:
0430:                                statementExecute(null);
0431:                                break;
0432:                            case Commands.Begin:
0433:                                begin();
0434:                                break;
0435:                            case Commands.Commit:
0436:                                commit();
0437:                                break;
0438:                            case Commands.Rollback:
0439:                                rollback();
0440:                                break;
0441:                            case Commands.SetNamedSavepoint:
0442:                                setNamedSavepoint();
0443:                                break;
0444:                            case Commands.SetUnnamedSavepoint:
0445:                                setUnnamedSavepoint();
0446:                                break;
0447:                            case Commands.ReleaseSavepoint:
0448:                                releaseSavepoint();
0449:                                break;
0450:                            case Commands.RollbackToSavepoint:
0451:                                rollbackToSavepoint();
0452:                                break;
0453:                            case Commands.SetTransactionIsolation:
0454:                                connectionSetTransactionIsolation();
0455:                                break;
0456:                            case Commands.SetReadOnly:
0457:                                connectionSetReadOnly();
0458:                                break;
0459:                            case Commands.ConnectionGetWarnings:
0460:                                connectionGetWarnings();
0461:                                break;
0462:                            case Commands.ConnectionClearWarnings:
0463:                                connectionClearWarnings();
0464:                                break;
0465:                            case Commands.GetVirtualDatabaseName:
0466:                                getVirtualDatabaseName();
0467:                                break;
0468:                            case Commands.DatabaseMetaDataGetDatabaseProductName:
0469:                                databaseMetaDataGetDatabaseProductName();
0470:                                break;
0471:                            case Commands.GetControllerVersionNumber:
0472:                                getControllerVersionNumber();
0473:                                break;
0474:                            case Commands.DatabaseMetaDataGetTables:
0475:                                databaseMetaDataGetTables();
0476:                                break;
0477:                            case Commands.DatabaseMetaDataGetColumns:
0478:                                databaseMetaDataGetColumns();
0479:                                break;
0480:                            case Commands.DatabaseMetaDataGetPrimaryKeys:
0481:                                databaseMetaDataGetPrimaryKeys();
0482:                                break;
0483:                            case Commands.DatabaseMetaDataGetProcedures:
0484:                                databaseMetaDataGetProcedures();
0485:                                break;
0486:                            case Commands.DatabaseMetaDataGetProcedureColumns:
0487:                                databaseMetaDataGetProcedureColumns();
0488:                                break;
0489:                            case Commands.ConnectionGetCatalogs:
0490:                                connectionGetCatalogs();
0491:                                break;
0492:                            case Commands.ConnectionGetCatalog:
0493:                                connectionGetCatalog();
0494:                                break;
0495:                            case Commands.DatabaseMetaDataGetTableTypes:
0496:                                databaseMetaDataGetTableTypes();
0497:                                break;
0498:                            case Commands.DatabaseMetaDataGetSchemas:
0499:                                databaseMetaDataGetSchemas();
0500:                                break;
0501:                            case Commands.DatabaseMetaDataGetTablePrivileges:
0502:                                databaseMetaDataGetTablePrivileges();
0503:                                break;
0504:                            case Commands.DatabaseMetaDataGetAttributes:
0505:                                databaseMetaDataGetAttributes();
0506:                                break;
0507:                            case Commands.DatabaseMetaDataGetBestRowIdentifier:
0508:                                databaseMetaDataGetBestRowIdentifier();
0509:                                break;
0510:                            case Commands.DatabaseMetaDataGetColumnPrivileges:
0511:                                databaseMetaDataGetColumnPrivileges();
0512:                                break;
0513:                            case Commands.DatabaseMetaDataGetCrossReference:
0514:                                databaseMetaDataGetCrossReference();
0515:                                break;
0516:                            case Commands.DatabaseMetaDataGetExportedKeys:
0517:                                databaseMetaDataGetExportedKeys();
0518:                                break;
0519:                            case Commands.DatabaseMetaDataGetImportedKeys:
0520:                                databaseMetaDataGetImportedKeys();
0521:                                break;
0522:                            case Commands.DatabaseMetaDataGetIndexInfo:
0523:                                databaseMetaDataGetIndexInfo();
0524:                                break;
0525:                            case Commands.DatabaseMetaDataGetSuperTables:
0526:                                databaseMetaDataGetSuperTables();
0527:                                break;
0528:                            case Commands.DatabaseMetaDataGetSuperTypes:
0529:                                databaseMetaDataGetSuperTypes();
0530:                                break;
0531:                            case Commands.DatabaseMetaDataGetTypeInfo:
0532:                                databaseMetaDataGetTypeInfo();
0533:                                break;
0534:                            case Commands.DatabaseMetaDataGetUDTs:
0535:                                databaseMetaDataGetUDTs();
0536:                                break;
0537:                            case Commands.DatabaseMetaDataGetVersionColumns:
0538:                                databaseMetaDataGetVersionColumns();
0539:                                break;
0540:                            case Commands.PreparedStatementGetMetaData:
0541:                                preparedStatementGetMetaData();
0542:                                break;
0543:                            case Commands.ConnectionSetCatalog:
0544:                                connectionSetCatalog();
0545:                                break;
0546:                            case Commands.Close:
0547:                                close();
0548:                                break;
0549:                            case Commands.Reset:
0550:                                reset();
0551:                                break;
0552:                            case Commands.FetchNextResultSetRows:
0553:                                fetchNextResultSetRows();
0554:                                break;
0555:                            case Commands.CloseRemoteResultSet:
0556:                                closeRemoteResultSet();
0557:                                break;
0558:                            case Commands.DatabaseStaticMetadata:
0559:                                databaseStaticMetadata();
0560:                                break;
0561:                            case Commands.RestoreConnectionState:
0562:                                restoreConnectionState();
0563:                                break;
0564:                            case Commands.RetrieveExecuteQueryResult:
0565:                                retrieveExecuteQueryResult();
0566:                                break;
0567:                            case Commands.RetrieveExecuteResult:
0568:                                retrieveExecuteResult();
0569:                                break;
0570:                            case Commands.RetrieveExecuteUpdateResult:
0571:                                retrieveExecuteUpdateResult();
0572:                                break;
0573:                            case Commands.RetrieveExecuteUpdateWithKeysResult:
0574:                                retrieveExecuteUpdateWithKeysResult();
0575:                                break;
0576:                            case Commands.RetrieveExecuteQueryResultWithParameters:
0577:                                retrieveExecuteQueryResultWithParameters();
0578:                                break;
0579:                            case Commands.RetrieveExecuteUpdateResultWithParameters:
0580:                                retrieveExecuteUpdateResultWithParameters();
0581:                                break;
0582:                            case Commands.RetrieveExecuteResultWithParameters:
0583:                                retrieveExecuteResultWithParameters();
0584:                                break;
0585:                            case Commands.RetrieveCommitResult:
0586:                                retrieveCommitResult();
0587:                                break;
0588:                            case Commands.RetrieveRollbackResult:
0589:                                retrieveRollbackResult();
0590:                                break;
0591:                            case Commands.RetrieveReleaseSavepoint:
0592:                                retrieveReleaseSavepoint();
0593:                                break;
0594:                            default:
0595:                                String errorMsg = "Unsupported protocol command: "
0596:                                        + command;
0597:                                logger.error(errorMsg);
0598:                                sendToDriver(new RuntimeException(errorMsg));
0599:                                break;
0600:                            }
0601:                        } catch (EOFException e) {
0602:                            logger.warn("Client (login:"
0603:                                    + login
0604:                                    + ",host:"
0605:                                    + in.getSocket().getInetAddress()
0606:                                            .getHostName()
0607:                                    + " closed connection with server)");
0608:                            closed = true;
0609:                        } catch (SocketException e) {
0610:                            // shutting down
0611:                            closed = true;
0612:                        } catch (IOException e) {
0613:                            closed = true;
0614:                            logger.warn("Closing connection with client "
0615:                                    + login + " because of IOException.(" + e
0616:                                    + ")");
0617:                        } catch (VDBisShuttingDownException e) {
0618:                            isKilled = true;
0619:                        } catch (SQLException e) {
0620:                            logger.warn("Error during command execution ("
0621:                                    + e.getMessage() + ")");
0622:                            if (transactionStarted && !transactionHasAborted) { // Failure of a query within a transaction automatically aborts the
0623:                                // transaction
0624:                                transactionHasAborted = (hasSavepoint == 0)
0625:                                        && ((command == Commands.StatementExecuteUpdate)
0626:                                                || (command == Commands.StatementExecuteUpdateWithKeys)
0627:                                                || (command == Commands.StatementExecute)
0628:                                                || (command == Commands.CallableStatementExecuteWithParameters)
0629:                                                || (command == Commands.CallableStatementExecuteQueryWithParameters)
0630:                                                || (command == Commands.CallableStatementExecuteUpdateWithParameters)
0631:                                                || (command == Commands.CallableStatementExecuteQuery) || (command == Commands.CallableStatementExecuteUpdate));
0632:                            }
0633:                            try {
0634:                                sendToDriver(e);
0635:                            } catch (IOException ignore) {
0636:                            }
0637:                        } catch (BadJDBCApiUsageException e) {
0638:                            logger.warn("Error during command execution ("
0639:                                    + e.getMessage() + ")");
0640:                            try {
0641:                                sendToDriver(e);
0642:                            } catch (IOException ignore) {
0643:                            }
0644:                        } catch (Throwable e) {
0645:                            logger.warn(
0646:                                    "Runtime error during command execution ("
0647:                                            + e.getMessage() + ")", e);
0648:                            if (transactionStarted) { // Failure of a query within a transaction automatically aborts the
0649:                                // transaction
0650:                                transactionHasAborted = (hasSavepoint == 0)
0651:                                        && ((command == Commands.StatementExecuteQuery)
0652:                                                || (command == Commands.StatementExecuteUpdate)
0653:                                                || (command == Commands.StatementExecuteUpdateWithKeys)
0654:                                                || (command == Commands.StatementExecute)
0655:                                                || (command == Commands.CallableStatementExecute)
0656:                                                || (command == Commands.CallableStatementExecuteQuery) || (command == Commands.CallableStatementExecuteUpdate));
0657:                            }
0658:                            try {
0659:                                sendToDriver((SQLException) new SQLException(e
0660:                                        .getLocalizedMessage()).initCause(e));
0661:                            } catch (IOException ignore) {
0662:                            }
0663:                        }
0664:                    } // while (!closed && !isKilled) get and process command from driver
0665:
0666:                    // Do the cleanup
0667:                    if (!streamedResultSets.isEmpty()) {
0668:                        for (Iterator iter = streamedResultSets.values()
0669:                                .iterator(); iter.hasNext();) {
0670:                            ControllerResultSet crs = (ControllerResultSet) iter
0671:                                    .next();
0672:                            crs.closeResultSet();
0673:                        }
0674:                        streamedResultSets.clear();
0675:                    }
0676:
0677:                    if (!isKilled) {
0678:                        // Abort in-flight transaction
0679:                        if (transactionStarted && !transactionHasAborted) {
0680:                            if (logger.isDebugEnabled())
0681:                                logger.debug("Aborting transaction "
0682:                                        + currentTid);
0683:                            try {
0684:                                vdb.abort(currentTid,
0685:                                        writeQueryExecutedInThisTransaction,
0686:                                        true);
0687:                            } catch (Throwable e) {
0688:                                if (logger.isWarnEnabled())
0689:                                    logger
0690:                                            .warn("Error during abort of transaction "
0691:                                                    + currentTid
0692:                                                    + "("
0693:                                                    + e
0694:                                                    + ")");
0695:                            }
0696:                        }
0697:
0698:                        // Close persistent connections
0699:                        if (persistentConnection) {
0700:                            vdb.closePersistentConnection(login,
0701:                                    persistentConnectionId);
0702:                        }
0703:                    } else {
0704:                        // FIXME: debug message for safe mode parallel shutdown of controllers
0705:                        // (note that parallel shutdown should be avoided)
0706:                        if (logger.isInfoEnabled()) {
0707:                            logger
0708:                                    .info("VirtualDatabaseWorkerThread killed by shutdown, no clean-up"
0709:                                            + " done. Number of pending transaction in scheduler: "
0710:                                            + vdb.getRequestManager()
0711:                                                    .getScheduler()
0712:                                                    .getPendingTransactions());
0713:                        }
0714:                    }
0715:
0716:                    // Close streams and underlying socket
0717:                    try {
0718:                        in.close();
0719:                    } catch (IOException ignore) {
0720:                    }
0721:                    try {
0722:                        out.close();
0723:                    } catch (IOException ignore) {
0724:                    }
0725:                }
0726:
0727:                synchronized (vdbActiveThreads) { // Remove ourselves from the appropriate thread list
0728:                    if (vdbActiveThreads.remove(this ))
0729:                        vdb.decreaseCurrentNbOfThread();
0730:                }
0731:
0732:                if (logger.isDebugEnabled())
0733:                    logger
0734:                            .debug("VirtualDatabaseWorkerThread associated to login: "
0735:                                    + this .getUser() + " terminating.");
0736:            }
0737:
0738:            private void close() throws IOException {
0739:                if (logger.isDebugEnabled())
0740:                    logger.debug("Close command");
0741:
0742:                cleanup();
0743:
0744:                sendToDriver(true);
0745:
0746:                closed = true;
0747:            }
0748:
0749:            private void closeRemoteResultSet() throws IOException {
0750:                if (logger.isDebugEnabled())
0751:                    logger.debug("CloseRemoteResultSet command");
0752:
0753:                String cursor = in.readLongUTF();
0754:                ControllerResultSet crsToClose = (ControllerResultSet) streamedResultSets
0755:                        .remove(cursor);
0756:                if (crsToClose == null) {
0757:                    sendToDriver(new SQLException(
0758:                            "No valid RemoteResultSet to close."));
0759:                } else {
0760:                    crsToClose.closeResultSet();
0761:                    sendToDriver(true);
0762:                }
0763:            }
0764:
0765:            private void reset() throws IOException {
0766:                // The client application has closed the connection but it is kept
0767:                // open in case the transparent connection pooling reuses it.
0768:                if (logger.isDebugEnabled())
0769:                    logger.debug("Reset command");
0770:
0771:                cleanup();
0772:
0773:                connectionHasClosed = false;
0774:                currentTid = 0;
0775:                transactionStarted = false;
0776:                transactionHasAborted = false;
0777:                transactionIsolation = Connection.DEFAULT_TRANSACTION_ISOLATION_LEVEL;
0778:                queryExecutedInThisTransaction = false;
0779:                hasSavepoint = 0;
0780:                sendToDriver(true);
0781:            }
0782:
0783:            private void cleanup() {
0784:                // Do the cleanup
0785:                if (transactionStarted && !transactionHasAborted) {
0786:                    /*
0787:                     * We need to abort the begin to cleanup the metadata associated with the
0788:                     * started transaction.
0789:                     */
0790:                    if (logger.isDebugEnabled())
0791:                        logger.debug("Aborting transaction " + currentTid);
0792:                    try {
0793:                        vdb.abort(currentTid,
0794:                                writeQueryExecutedInThisTransaction, true);
0795:                    } catch (Exception e) {
0796:                        if (logger.isDebugEnabled())
0797:                            logger.debug("Error while aborting transaction "
0798:                                    + currentTid + "(" + e + ")", e);
0799:                    }
0800:                }
0801:            }
0802:
0803:            private void restoreConnectionState() throws IOException,
0804:                    SQLException {
0805:                if (logger.isDebugEnabled())
0806:                    logger.debug("RestoreConnectionState command");
0807:
0808:                // Re-connect has opened a new persistent connection that will not be used
0809:                if (persistentConnection) {
0810:                    vdb
0811:                            .closePersistentConnection(login,
0812:                                    persistentConnectionId);
0813:                }
0814:
0815:                writeQueryExecutedInThisTransaction = in.readBoolean();
0816:                // We receive autocommit from driver
0817:                transactionStarted = !in.readBoolean();
0818:                if (transactionStarted)
0819:                    currentTid = in.readLong();
0820:                persistentConnection = in.readBoolean();
0821:                if (persistentConnection)
0822:                    persistentConnectionId = in.readLong();
0823:
0824:                // Restore the persistent connection first (if any) before trying to perform
0825:                // any operation on the transaction
0826:                if (persistentConnection) {
0827:                    if (!vdb.hasPersistentConnection(persistentConnectionId)) {
0828:                        RecoveryLog recoveryLog = vdb.getRequestManager()
0829:                                .getRecoveryLog();
0830:                        if (!recoveryLog
0831:                                .findCloseForPersistentConnection(persistentConnectionId)) {
0832:                            vdb
0833:                                    .failoverForPersistentConnection(persistentConnectionId);
0834:                        } else {
0835:                            connectionHasClosed = true;
0836:                        }
0837:                    }
0838:                }
0839:
0840:                retrieveSQLWarnings = in.readBoolean();
0841:
0842:                // Acknowledge driver
0843:                out.writeBoolean(true);
0844:                out.flush();
0845:
0846:                if (transactionStarted) {
0847:                    try {
0848:                        // Check if the transaction exists here
0849:                        vdb.requestManager.getTransactionMetaData(new Long(
0850:                                currentTid));
0851:                        // Only notify failover if we have the transaction in our context
0852:                        vdb.failoverForTransaction(currentTid);
0853:                        /*
0854:                         * Transaction is started on this controller... it was either a
0855:                         * transaction that contained write statements, either a read-only
0856:                         * transaction with broadcasted statements, so we force
0857:                         * writeQueryExecutedInThisTransaction to true.
0858:                         */
0859:                        writeQueryExecutedInThisTransaction = true;
0860:                    } catch (SQLException e) {
0861:                        /*
0862:                         * Transaction has not been found because it either already
0863:                         * committed/rollbacked or it was not started (no request played so far
0864:                         * in the transaction or just read queries on the controller that has
0865:                         * failed). Check first if we can find a trace of commit/rollback in the
0866:                         * recovery log and if not start the transaction now. This is needed
0867:                         * only if it was a write transaction or a transaction with broadcasted
0868:                         * read requests.
0869:                         */
0870:                        RecoveryLog recoveryLog = vdb.getRequestManager()
0871:                                .getRecoveryLog();
0872:                        if (writeQueryExecutedInThisTransaction) {
0873:                            if (!recoveryLog
0874:                                    .findCommitForTransaction(currentTid)
0875:                                    && !recoveryLog
0876:                                            .findRollbackForTransaction(currentTid)) {
0877:                                vdb.requestManager.doBegin(login, currentTid,
0878:                                        persistentConnection,
0879:                                        persistentConnectionId);
0880:                            } else {
0881:                                // FIXME dirty overload of this flag semantics
0882:                                transactionHasAborted = true;
0883:                            }
0884:                        } else {
0885:                            vdb.requestManager.doBegin(login, currentTid,
0886:                                    persistentConnection,
0887:                                    persistentConnectionId);
0888:                            writeQueryExecutedInThisTransaction = true;
0889:                        }
0890:                    }
0891:                }
0892:            }
0893:
0894:            //
0895:            // Catalog
0896:            //
0897:
0898:            private void connectionSetCatalog() throws IOException {
0899:                // Warning! This could bypass the security checkings based on client IP
0900:                // address. If a user has access to a virtual database, through setCatalog()
0901:                // is will be able to access all other virtual databases where his
0902:                // login/password is valid regardless of the IP filtering settings.
0903:                if (logger.isDebugEnabled())
0904:                    logger.debug("ConnectionSetCatalog command");
0905:                String catalog = in.readLongUTF();
0906:                boolean change = controller.hasVirtualDatabase(catalog);
0907:                if (change) {
0908:                    VirtualDatabase tempvdb = controller
0909:                            .getVirtualDatabase(catalog);
0910:                    if (!tempvdb.getAuthenticationManager().isValidVirtualUser(
0911:                            user))
0912:                        sendToDriver(new SQLException(
0913:                                "User authentication has failed for asked catalog. No change"));
0914:                    else {
0915:                        this .vdb = tempvdb;
0916:                        sendToDriver(true);
0917:                    }
0918:                } else
0919:                    sendToDriver(false);
0920:
0921:            }
0922:
0923:            private void connectionGetCatalog() throws IOException {
0924:                if (logger.isDebugEnabled())
0925:                    logger.debug("ConnectionGetCatalog command");
0926:
0927:                sendToDriver(vdb.getVirtualDatabaseName());
0928:            }
0929:
0930:            private void connectionGetCatalogs() throws IOException {
0931:                if (logger.isDebugEnabled())
0932:                    logger.debug("ConnectionGetCatalogs command");
0933:                ArrayList list = controller.getVirtualDatabaseNames();
0934:                sendToDriver(vdb.getDynamicMetaData().getCatalogs(list));
0935:            }
0936:
0937:            private void connectionSetTransactionIsolation() throws IOException {
0938:                int level = in.readInt();
0939:                if (logger.isDebugEnabled())
0940:                    logger.debug("SetTransactionIsolation command (level="
0941:                            + level + ")");
0942:
0943:                // Check that we are not in a running transaction
0944:                if (transactionStarted && queryExecutedInThisTransaction) {
0945:                    sendToDriver(new SQLException(
0946:                            "Cannot change the transaction isolation in a running transaction"));
0947:                    return;
0948:                }
0949:
0950:                MetadataContainer metadataContainer = vdb.getStaticMetaData()
0951:                        .getMetadataContainer();
0952:                if (metadataContainer != null) {
0953:                    Object value = metadataContainer
0954:                            .get(MetadataContainer
0955:                                    .getContainerKey(
0956:                                            MetadataDescription.SUPPORTS_TRANSACTION_ISOLATION_LEVEL,
0957:                                            new Class[] { Integer.TYPE },
0958:                                            new Object[] { new Integer(level) }));
0959:
0960:                    if (value != null) {
0961:                        if (!((Boolean) value).booleanValue()) {
0962:                            sendToDriver(new SQLException(
0963:                                    "Transaction isolation level "
0964:                                            + level
0965:                                            + " is not supported by the database"));
0966:                            return;
0967:                        }
0968:                    } else
0969:                        logger
0970:                                .warn("Unable to check validity of transaction isolation level "
0971:                                        + level);
0972:                } else
0973:                    logger
0974:                            .warn("Unable to check validity of transaction isolation level "
0975:                                    + level);
0976:                transactionIsolation = level;
0977:                sendToDriver(true);
0978:            }
0979:
0980:            private void connectionSetReadOnly() throws IOException {
0981:                isReadOnly = in.readBoolean();
0982:                if (logger.isDebugEnabled())
0983:                    logger.debug("SetReadOnly command (value=" + true + ")");
0984:
0985:                sendToDriver(true);
0986:            }
0987:
0988:            private void connectionGetWarnings() throws IOException {
0989:                long persistentConnId = in.readLong();
0990:                try {
0991:                    sendToDriver(vdb.getConnectionWarnings(persistentConnId));
0992:                } catch (SQLException e) {
0993:                    sendToDriver(e);
0994:                }
0995:            }
0996:
0997:            private void connectionClearWarnings() throws IOException {
0998:                long persistentConnId = in.readLong();
0999:                try {
1000:                    vdb.clearConnectionWarnings(persistentConnId);
1001:                    sendToDriver(true);
1002:                } catch (SQLException e) {
1003:                    sendToDriver(e);
1004:                }
1005:            }
1006:
1007:            //
1008:            // Database MetaData
1009:            //
1010:
1011:            /**
1012:             * @see java.sql.DatabaseMetaData#getAttributes(java.lang.String,
1013:             *      java.lang.String, java.lang.String, java.lang.String)
1014:             */
1015:            private void databaseMetaDataGetAttributes() throws IOException {
1016:                if (logger.isDebugEnabled())
1017:                    logger.debug("DatabaseMetaDataGetAttributes command");
1018:                String catalog = in.readLongUTF();
1019:                String schemaPattern = in.readLongUTF();
1020:                String typeNamePattern = in.readLongUTF();
1021:                String attributeNamePattern = in.readLongUTF();
1022:
1023:                try {
1024:                    sendToDriver(vdb.getDynamicMetaData().getAttributes(
1025:                            new ConnectionContext(login, transactionStarted,
1026:                                    currentTid, persistentConnection,
1027:                                    persistentConnectionId), catalog,
1028:                            schemaPattern, typeNamePattern,
1029:                            attributeNamePattern));
1030:                } catch (SQLException e) {
1031:                    if (logger.isWarnEnabled())
1032:                        logger
1033:                                .warn(
1034:                                        "Error while calling databaseMetaDataGetAttributes",
1035:                                        e);
1036:                    sendToDriver(e);
1037:                }
1038:            }
1039:
1040:            /**
1041:             * @see java.sql.DatabaseMetaData#getBestRowIdentifier(java.lang.String,
1042:             *      java.lang.String, java.lang.String, int, boolean)
1043:             */
1044:            private void databaseMetaDataGetBestRowIdentifier()
1045:                    throws IOException {
1046:                if (logger.isDebugEnabled())
1047:                    logger
1048:                            .debug("DatabaseMetaDataGetBestRowIdentifier command");
1049:
1050:                String catalog = in.readLongUTF();
1051:                String schema = in.readLongUTF();
1052:                String table = in.readLongUTF();
1053:                int scope = in.readInt();
1054:                boolean nullable = in.readBoolean();
1055:
1056:                try {
1057:                    sendToDriver(vdb.getDynamicMetaData().getBestRowIdentifier(
1058:                            new ConnectionContext(login, transactionStarted,
1059:                                    currentTid, persistentConnection,
1060:                                    persistentConnectionId), catalog, schema,
1061:                            table, scope, nullable));
1062:                } catch (SQLException e) {
1063:                    if (logger.isWarnEnabled())
1064:                        logger
1065:                                .warn(
1066:                                        "Error while calling databaseMetaDataGetBestRowIdentifier",
1067:                                        e);
1068:                    sendToDriver(e);
1069:                }
1070:            }
1071:
1072:            /**
1073:             * @see java.sql.DatabaseMetaData#getColumnPrivileges(java.lang.String,
1074:             *      java.lang.String, java.lang.String, java.lang.String)
1075:             */
1076:            private void databaseMetaDataGetColumnPrivileges()
1077:                    throws IOException {
1078:                if (logger.isDebugEnabled())
1079:                    logger.debug("DatabaseMetaDataGetColumnPrivileges command");
1080:
1081:                String catalog = in.readLongUTF();
1082:                String schema = in.readLongUTF();
1083:                String table = in.readLongUTF();
1084:                String columnNamePattern = in.readLongUTF();
1085:
1086:                try {
1087:                    sendToDriver(vdb.getDynamicMetaData().getColumnPrivileges(
1088:                            new ConnectionContext(login, transactionStarted,
1089:                                    currentTid, persistentConnection,
1090:                                    persistentConnectionId), catalog, schema,
1091:                            table, columnNamePattern));
1092:                } catch (SQLException e) {
1093:                    if (logger.isWarnEnabled())
1094:                        logger
1095:                                .warn(
1096:                                        "Error while calling databaseMetaDataGetColumnPrivileges",
1097:                                        e);
1098:                    sendToDriver(e);
1099:                }
1100:            }
1101:
1102:            /**
1103:             * @see java.sql.DatabaseMetaData#getColumns(java.lang.String,
1104:             *      java.lang.String, java.lang.String, java.lang.String)
1105:             */
1106:            private void databaseMetaDataGetColumns() throws IOException {
1107:                if (logger.isDebugEnabled())
1108:                    logger.debug("DatabaseMetaDataGetColumns command");
1109:                String ccatalog = in.readLongUTF();
1110:                String cschemaPattern = in.readLongUTF();
1111:                String ctableNamePattern = in.readLongUTF();
1112:                String ccolumnNamePattern = in.readLongUTF();
1113:
1114:                try {
1115:                    sendToDriver(vdb.getDynamicMetaData().getColumns(
1116:                            new ConnectionContext(login, transactionStarted,
1117:                                    currentTid, persistentConnection,
1118:                                    persistentConnectionId), ccatalog,
1119:                            cschemaPattern, ctableNamePattern,
1120:                            ccolumnNamePattern));
1121:                } catch (SQLException e) {
1122:                    if (logger.isWarnEnabled())
1123:                        logger
1124:                                .warn(
1125:                                        "Error while calling databaseMetaDataGetColumns",
1126:                                        e);
1127:                    sendToDriver(e);
1128:                }
1129:            }
1130:
1131:            /**
1132:             * @see java.sql.DatabaseMetaData#getCrossReference(java.lang.String,
1133:             *      java.lang.String, java.lang.String, java.lang.String,
1134:             *      java.lang.String, java.lang.String)
1135:             */
1136:            private void databaseMetaDataGetCrossReference() throws IOException {
1137:                if (logger.isDebugEnabled())
1138:                    logger.debug("DatabaseMetaDataGetCrossReference command");
1139:
1140:                String primaryCatalog = in.readLongUTF();
1141:                String primarySchema = in.readLongUTF();
1142:                String primaryTable = in.readLongUTF();
1143:                String foreignCatalog = in.readLongUTF();
1144:                String foreignSchema = in.readLongUTF();
1145:                String foreignTable = in.readLongUTF();
1146:
1147:                try {
1148:                    sendToDriver(vdb.getDynamicMetaData().getCrossReference(
1149:                            new ConnectionContext(login, transactionStarted,
1150:                                    currentTid, persistentConnection,
1151:                                    persistentConnectionId), primaryCatalog,
1152:                            primarySchema, primaryTable, foreignCatalog,
1153:                            foreignSchema, foreignTable));
1154:                } catch (SQLException e) {
1155:                    if (logger.isWarnEnabled())
1156:                        logger
1157:                                .warn(
1158:                                        "Error while calling databaseMetaDataGetCrossReference",
1159:                                        e);
1160:                    sendToDriver(e);
1161:                }
1162:            }
1163:
1164:            /**
1165:             * @see java.sql.DatabaseMetaData#getDatabaseProductName()
1166:             */
1167:            private void databaseMetaDataGetDatabaseProductName()
1168:                    throws IOException {
1169:                if (logger.isDebugEnabled())
1170:                    logger.debug("GetDatabaseProductName command");
1171:
1172:                sendToDriver(vdb.getDatabaseProductName());
1173:            }
1174:
1175:            /**
1176:             * @see java.sql.DatabaseMetaData#getExportedKeys(java.lang.String,
1177:             *      java.lang.String, java.lang.String)
1178:             */
1179:            private void databaseMetaDataGetExportedKeys() throws IOException {
1180:                if (logger.isDebugEnabled())
1181:                    logger.debug("DatabaseMetaDataGetExportedKeys command");
1182:
1183:                String catalog = in.readLongUTF();
1184:                String schema = in.readLongUTF();
1185:                String table = in.readLongUTF();
1186:
1187:                try {
1188:                    sendToDriver(vdb.getDynamicMetaData().getExportedKeys(
1189:                            new ConnectionContext(login, transactionStarted,
1190:                                    currentTid, persistentConnection,
1191:                                    persistentConnectionId), catalog, schema,
1192:                            table));
1193:                } catch (SQLException e) {
1194:                    if (logger.isWarnEnabled())
1195:                        logger
1196:                                .warn(
1197:                                        "Error while calling databaseMetaDataGetExportedKeys",
1198:                                        e);
1199:                    sendToDriver(e);
1200:                }
1201:            }
1202:
1203:            /**
1204:             * @see java.sql.DatabaseMetaData#getImportedKeys(java.lang.String,
1205:             *      java.lang.String, java.lang.String)
1206:             */
1207:            private void databaseMetaDataGetImportedKeys() throws IOException {
1208:                if (logger.isDebugEnabled())
1209:                    logger.debug("DatabaseMetaDataGetImportedKeys command");
1210:
1211:                String catalog = in.readLongUTF();
1212:                String schema = in.readLongUTF();
1213:                String table = in.readLongUTF();
1214:
1215:                try {
1216:                    sendToDriver(vdb.getDynamicMetaData().getImportedKeys(
1217:                            new ConnectionContext(login, transactionStarted,
1218:                                    currentTid, persistentConnection,
1219:                                    persistentConnectionId), catalog, schema,
1220:                            table));
1221:                } catch (SQLException e) {
1222:                    if (logger.isWarnEnabled())
1223:                        logger
1224:                                .warn(
1225:                                        "Error while calling databaseMetaDataGetImportedKeys",
1226:                                        e);
1227:                    sendToDriver(e);
1228:                }
1229:            }
1230:
1231:            /**
1232:             * @see java.sql.DatabaseMetaData#getIndexInfo(java.lang.String,
1233:             *      java.lang.String, java.lang.String, boolean, boolean)
1234:             */
1235:            private void databaseMetaDataGetIndexInfo() throws IOException {
1236:                if (logger.isDebugEnabled())
1237:                    logger.debug("databaseMetaDataGetIndexInfo command");
1238:
1239:                String catalog = in.readLongUTF();
1240:                String schema = in.readLongUTF();
1241:                String table = in.readLongUTF();
1242:                boolean unique = in.readBoolean();
1243:                boolean approximate = in.readBoolean();
1244:
1245:                try {
1246:                    sendToDriver(vdb.getDynamicMetaData().getIndexInfo(
1247:                            new ConnectionContext(login, transactionStarted,
1248:                                    currentTid, persistentConnection,
1249:                                    persistentConnectionId), catalog, schema,
1250:                            table, unique, approximate));
1251:                } catch (SQLException e) {
1252:                    if (logger.isWarnEnabled())
1253:                        logger
1254:                                .warn(
1255:                                        "Error while calling databaseMetaDataGetIndexInfo",
1256:                                        e);
1257:                    sendToDriver(e);
1258:                }
1259:            }
1260:
1261:            /**
1262:             * @see java.sql.DatabaseMetaData#getPrimaryKeys(java.lang.String,
1263:             *      java.lang.String, java.lang.String)
1264:             */
1265:            private void databaseMetaDataGetPrimaryKeys() throws IOException {
1266:                if (logger.isDebugEnabled())
1267:                    logger.debug("DatabaseMetaDataGetPrimaryKeys command");
1268:
1269:                String pcatalog = in.readLongUTF();
1270:                String pschemaPattern = in.readLongUTF();
1271:                String ptableNamePattern = in.readLongUTF();
1272:
1273:                try {
1274:                    sendToDriver(vdb.getDynamicMetaData().getPrimaryKeys(
1275:                            new ConnectionContext(login, transactionStarted,
1276:                                    currentTid, persistentConnection,
1277:                                    persistentConnectionId), pcatalog,
1278:                            pschemaPattern, ptableNamePattern));
1279:                } catch (SQLException e) {
1280:                    if (logger.isWarnEnabled())
1281:                        logger
1282:                                .warn(
1283:                                        "Error while calling databaseMetaDataGetPrimaryKeys",
1284:                                        e);
1285:                    sendToDriver(e);
1286:                }
1287:            }
1288:
1289:            /**
1290:             * @see java.sql.DatabaseMetaData#getProcedureColumns(java.lang.String,
1291:             *      java.lang.String, java.lang.String, java.lang.String)
1292:             */
1293:            private void databaseMetaDataGetProcedureColumns()
1294:                    throws IOException {
1295:                if (logger.isDebugEnabled())
1296:                    logger.debug("DatabaseMetaDataGetProcedureColumns command");
1297:
1298:                String pccatalog = in.readLongUTF();
1299:                String pcschemaPattern = in.readLongUTF();
1300:                String pcprocedureNamePattern = in.readLongUTF();
1301:                String pccolumnNamePattern = in.readLongUTF();
1302:
1303:                try {
1304:                    sendToDriver(vdb.getDynamicMetaData().getProcedureColumns(
1305:                            new ConnectionContext(login, transactionStarted,
1306:                                    currentTid, persistentConnection,
1307:                                    persistentConnectionId), pccatalog,
1308:                            pcschemaPattern, pcprocedureNamePattern,
1309:                            pccolumnNamePattern));
1310:                } catch (SQLException e) {
1311:                    if (logger.isWarnEnabled())
1312:                        logger
1313:                                .warn(
1314:                                        "Error while calling databaseMetaDataGetProcedureColumns",
1315:                                        e);
1316:                    sendToDriver(e);
1317:                }
1318:            }
1319:
1320:            /**
1321:             * @see java.sql.DatabaseMetaData#getProcedures(java.lang.String,
1322:             *      java.lang.String, java.lang.String)
1323:             */
1324:            private void databaseMetaDataGetProcedures() throws IOException {
1325:                if (logger.isDebugEnabled())
1326:                    logger.debug("DatabaseMetaDataGetProcedures command");
1327:
1328:                String rcatalog = in.readLongUTF();
1329:                String rschemaPattern = in.readLongUTF();
1330:                String procedureNamePattern = in.readLongUTF();
1331:
1332:                try {
1333:                    sendToDriver(vdb.getDynamicMetaData().getProcedures(
1334:                            new ConnectionContext(login, transactionStarted,
1335:                                    currentTid, persistentConnection,
1336:                                    persistentConnectionId), rcatalog,
1337:                            rschemaPattern, procedureNamePattern));
1338:                } catch (SQLException e) {
1339:                    if (logger.isWarnEnabled())
1340:                        logger
1341:                                .warn(
1342:                                        "Error while calling databaseMetaDataGetProcedures",
1343:                                        e);
1344:                    sendToDriver(e);
1345:                }
1346:            }
1347:
1348:            /**
1349:             * @see java.sql.DatabaseMetaData#getSchemas()
1350:             */
1351:            private void databaseMetaDataGetSchemas() throws IOException {
1352:                if (logger.isDebugEnabled())
1353:                    logger.debug("DatabaseMetaDataGetSchemas Types command");
1354:
1355:                try {
1356:                    sendToDriver(vdb.getDynamicMetaData().getSchemas(
1357:                            new ConnectionContext(login, transactionStarted,
1358:                                    currentTid, persistentConnection,
1359:                                    persistentConnectionId)));
1360:                } catch (SQLException e) {
1361:                    if (logger.isWarnEnabled())
1362:                        logger
1363:                                .warn(
1364:                                        "Error while calling databaseMetaDataGetSchemas",
1365:                                        e);
1366:                    sendToDriver(e);
1367:                }
1368:            }
1369:
1370:            /**
1371:             * @see java.sql.DatabaseMetaData#getSuperTables(java.lang.String,
1372:             *      java.lang.String, java.lang.String)
1373:             */
1374:            private void databaseMetaDataGetSuperTables() throws IOException {
1375:                if (logger.isDebugEnabled())
1376:                    logger.debug("DatabaseMetaDataGetSuperTables command");
1377:
1378:                String catalog = in.readLongUTF();
1379:                String schemaPattern = in.readLongUTF();
1380:                String tableNamePattern = in.readLongUTF();
1381:
1382:                try {
1383:                    sendToDriver(vdb.getDynamicMetaData().getSuperTables(
1384:                            new ConnectionContext(login, transactionStarted,
1385:                                    currentTid, persistentConnection,
1386:                                    persistentConnectionId), catalog,
1387:                            schemaPattern, tableNamePattern));
1388:                } catch (SQLException e) {
1389:                    if (logger.isWarnEnabled())
1390:                        logger
1391:                                .warn(
1392:                                        "Error while calling databaseMetaDataGetSuperTables",
1393:                                        e);
1394:                    sendToDriver(e);
1395:                }
1396:            }
1397:
1398:            /**
1399:             * @see java.sql.DatabaseMetaData#getSuperTypes(java.lang.String,
1400:             *      java.lang.String, java.lang.String)
1401:             */
1402:            private void databaseMetaDataGetSuperTypes() throws IOException {
1403:                if (logger.isDebugEnabled())
1404:                    logger.debug("DatabaseMetaDataGetSuperTables command");
1405:
1406:                String catalog = in.readLongUTF();
1407:                String schemaPattern = in.readLongUTF();
1408:                String tableNamePattern = in.readLongUTF();
1409:
1410:                try {
1411:                    sendToDriver(vdb.getDynamicMetaData().getSuperTypes(
1412:                            new ConnectionContext(login, transactionStarted,
1413:                                    currentTid, persistentConnection,
1414:                                    persistentConnectionId), catalog,
1415:                            schemaPattern, tableNamePattern));
1416:                } catch (SQLException e) {
1417:                    if (logger.isWarnEnabled())
1418:                        logger
1419:                                .warn(
1420:                                        "Error while calling databaseMetaDataGetSuperTypes",
1421:                                        e);
1422:                    sendToDriver(e);
1423:                }
1424:            }
1425:
1426:            /**
1427:             * @see java.sql.DatabaseMetaData#getTablePrivileges(java.lang.String,
1428:             *      java.lang.String, java.lang.String)
1429:             */
1430:            private void databaseMetaDataGetTablePrivileges()
1431:                    throws IOException {
1432:                if (logger.isDebugEnabled())
1433:                    logger.debug("DatabaseMetaDataGetTablePrivileges command");
1434:
1435:                String tpcatalog = in.readLongUTF();
1436:                String tpschemaPattern = in.readLongUTF();
1437:                String tptablePattern = in.readLongUTF();
1438:
1439:                try {
1440:                    sendToDriver(vdb.getDynamicMetaData().getTablePrivileges(
1441:                            new ConnectionContext(login, transactionStarted,
1442:                                    currentTid, persistentConnection,
1443:                                    persistentConnectionId), tpcatalog,
1444:                            tpschemaPattern, tptablePattern));
1445:                } catch (SQLException e) {
1446:                    if (logger.isWarnEnabled())
1447:                        logger
1448:                                .warn(
1449:                                        "Error while calling databaseMetaDataGetTablePrivileges",
1450:                                        e);
1451:                    sendToDriver(e);
1452:                }
1453:            }
1454:
1455:            /**
1456:             * @see java.sql.DatabaseMetaData#getTables(java.lang.String,
1457:             *      java.lang.String, java.lang.String, java.lang.String[])
1458:             */
1459:            private void databaseMetaDataGetTables() throws IOException {
1460:                if (logger.isDebugEnabled())
1461:                    logger.debug("DatabaseMetaDataGetTables command");
1462:
1463:                String tcatalog = in.readLongUTF();
1464:                String tschemaPattern = in.readLongUTF();
1465:                String ttableNamePattern = in.readLongUTF();
1466:
1467:                String[] ttypes = null;
1468:                if (in.readBoolean()) {
1469:                    int size = in.readInt();
1470:                    ttypes = new String[size];
1471:                    for (int i = 0; i < size; i++)
1472:                        ttypes[i] = in.readLongUTF();
1473:                }
1474:
1475:                try {
1476:                    sendToDriver(vdb.getDynamicMetaData().getTables(
1477:                            new ConnectionContext(login, transactionStarted,
1478:                                    currentTid, persistentConnection,
1479:                                    persistentConnectionId), tcatalog,
1480:                            tschemaPattern, ttableNamePattern, ttypes));
1481:                } catch (SQLException e) {
1482:                    if (logger.isWarnEnabled())
1483:                        logger
1484:                                .warn(
1485:                                        "Error while calling databaseMetaDataGetTables",
1486:                                        e);
1487:                    sendToDriver(e);
1488:                }
1489:            }
1490:
1491:            /**
1492:             * @see java.sql.DatabaseMetaData#getTableTypes()
1493:             */
1494:            private void databaseMetaDataGetTableTypes() throws IOException {
1495:                if (logger.isDebugEnabled())
1496:                    logger.debug("DatabaseMetaDataGetTableTypes command");
1497:
1498:                try {
1499:                    sendToDriver(vdb.getDynamicMetaData().getTableTypes(
1500:                            new ConnectionContext(login, transactionStarted,
1501:                                    currentTid, persistentConnection,
1502:                                    persistentConnectionId)));
1503:                } catch (SQLException e) {
1504:                    if (logger.isWarnEnabled())
1505:                        logger
1506:                                .warn(
1507:                                        "Error while calling databaseMetaDataGetTableTypes",
1508:                                        e);
1509:                    sendToDriver(e);
1510:                }
1511:            }
1512:
1513:            /**
1514:             * @see java.sql.DatabaseMetaData#getTypeInfo()
1515:             */
1516:            private void databaseMetaDataGetTypeInfo() throws IOException {
1517:                if (logger.isDebugEnabled())
1518:                    logger.debug("DatabaseMetaDataGetTypeInfo command");
1519:
1520:                try {
1521:                    sendToDriver(vdb.getDynamicMetaData().getTypeInfo(
1522:                            new ConnectionContext(login, transactionStarted,
1523:                                    currentTid, persistentConnection,
1524:                                    persistentConnectionId)));
1525:                } catch (SQLException e) {
1526:                    if (logger.isWarnEnabled())
1527:                        logger
1528:                                .warn(
1529:                                        "Error while calling databaseMetaDataGetTypeInfo",
1530:                                        e);
1531:                    sendToDriver(e);
1532:                }
1533:            }
1534:
1535:            /**
1536:             * @see java.sql.DatabaseMetaData#getUDTs(java.lang.String, java.lang.String,
1537:             *      java.lang.String, int[])
1538:             */
1539:            private void databaseMetaDataGetUDTs() throws IOException {
1540:                if (logger.isDebugEnabled())
1541:                    logger.debug("DatabaseMetaDataGetUDTs command");
1542:
1543:                String catalog = in.readLongUTF();
1544:                String schemaPattern = in.readLongUTF();
1545:                String tableNamePattern = in.readLongUTF();
1546:
1547:                int[] types = null;
1548:                if (in.readBoolean()) {
1549:                    int size = in.readInt();
1550:                    types = new int[size];
1551:                    for (int i = 0; i < size; i++)
1552:                        types[i] = in.readInt();
1553:                }
1554:
1555:                try {
1556:                    sendToDriver(vdb.getDynamicMetaData().getUDTs(
1557:                            new ConnectionContext(login, transactionStarted,
1558:                                    currentTid, persistentConnection,
1559:                                    persistentConnectionId), catalog,
1560:                            schemaPattern, tableNamePattern, types));
1561:                } catch (SQLException e) {
1562:                    if (logger.isWarnEnabled())
1563:                        logger.warn(
1564:                                "Error while calling databaseMetaDataGetUDTs",
1565:                                e);
1566:                    sendToDriver(e);
1567:                }
1568:            }
1569:
1570:            /**
1571:             * @see java.sql.DatabaseMetaData#getVersionColumns(java.lang.String,
1572:             *      java.lang.String, java.lang.String)
1573:             */
1574:            private void databaseMetaDataGetVersionColumns() throws IOException {
1575:                if (logger.isDebugEnabled())
1576:                    logger.debug("DatabaseMetaDataGetVersionColumns command");
1577:
1578:                String catalog = in.readLongUTF();
1579:                String schema = in.readLongUTF();
1580:                String table = in.readLongUTF();
1581:
1582:                try {
1583:                    sendToDriver(vdb.getDynamicMetaData().getVersionColumns(
1584:                            new ConnectionContext(login, transactionStarted,
1585:                                    currentTid, persistentConnection,
1586:                                    persistentConnectionId), catalog, schema,
1587:                            table));
1588:                } catch (SQLException e) {
1589:                    if (logger.isWarnEnabled())
1590:                        logger
1591:                                .warn(
1592:                                        "Error while calling databaseMetaDataGetVersionColumns",
1593:                                        e);
1594:                    sendToDriver(e);
1595:                }
1596:            }
1597:
1598:            /**
1599:             * Get the static metadata key from the socket and return the corresponding
1600:             * metadata.
1601:             * 
1602:             * @throws IOException if an IO error occurs
1603:             * @throws NotImplementedException if the underlying metadata access method is
1604:             *           not implemented
1605:             */
1606:            private void databaseStaticMetadata() throws IOException,
1607:                    NotImplementedException {
1608:                // the "getXXX(Y,Z,...)" hash key of the metadata
1609:                // query called by the client using the driver.
1610:                String key = in.readLongUTF();
1611:                if (logger.isDebugEnabled())
1612:                    logger.debug("DatabaseStaticMetadata command for " + key);
1613:                MetadataContainer container = vdb.getStaticMetaData()
1614:                        .getMetadataContainer();
1615:                if (container == null) // no metadata has been gathered yet from backends
1616:                {
1617:                    String msg = "No metadata is available probably because no backend is enabled on that controller.";
1618:                    logger.info(msg);
1619:                    sendToDriver(new SQLException(msg));
1620:                } else {
1621:                    /**
1622:                     * To get an exhaustive list of all the types of java objects stored in
1623:                     * this hash table, search for all callers of
1624:                     * {@link org.continuent.sequoia.driver.DatabaseMetaData#getMetadata(String, Class[], Object[], boolean)}
1625:                     * and see also
1626:                     * {@link org.continuent.sequoia.controller.backend.DatabaseBackendMetaData#retrieveDatabaseMetadata()}
1627:                     * At this time it's limited to the following types: String, int and
1628:                     * boolean. boolean is the most frequent.
1629:                     */
1630:                    /*
1631:                     * Since we don't expect that any of these metadata methods will ever
1632:                     * return a non- java.sql.Types, we re-use here the serialization
1633:                     * implemented for SQL Data/ResultSets elements.
1634:                     */
1635:
1636:                    SQLDataSerialization.Serializer serializer;
1637:                    Object result = container.get(key);
1638:
1639:                    try {
1640:                        serializer = SQLDataSerialization.getSerializer(result);
1641:                        // TODO: clean-up this.
1642:                        if (serializer.isUndefined()) // <=> result == null
1643:                            throw new NotImplementedException();
1644:                    } catch (NotImplementedException innerEx) { // Should we just print a warning in case result == null ?
1645:                        // This should never happen with decent drivers.
1646:                        String msg;
1647:                        if (null == result)
1648:                            msg = " returned a null object.";
1649:                        else
1650:                            msg = " returned an object of an unsupported java type:"
1651:                                    + result.getClass().getName() + ".";
1652:
1653:                        NotImplementedException outerEx = new NotImplementedException(
1654:                                "Backend driver method " + key + msg);
1655:                        outerEx.initCause(innerEx);
1656:                        throw outerEx;
1657:                    }
1658:
1659:                    TypeTag.NOT_EXCEPTION.sendToStream(out);
1660:                    serializer.getTypeTag().sendToStream(out);
1661:                    serializer.sendToStream(result, out);
1662:                }
1663:
1664:                out.flush();
1665:            }
1666:
1667:            private void preparedStatementGetMetaData() throws IOException {
1668:                if (logger.isDebugEnabled())
1669:                    logger.debug("PreparedStatementGetMetaData command");
1670:
1671:                String sqlTemplate = in.readLongUTF();
1672:
1673:                try {
1674:                    AbstractRequest request = new UnknownWriteRequest(
1675:                            sqlTemplate, false, 0, "");
1676:                    request.setIsAutoCommit(!transactionStarted);
1677:                    setRequestParametersAndTransactionStarted(request);
1678:                    sendToDriver(vdb.getPreparedStatementGetMetaData(request));
1679:                } catch (SQLException e) {
1680:                    if (logger.isWarnEnabled())
1681:                        logger
1682:                                .warn(
1683:                                        "Error while calling databaseMetaDataGetVersionColumns",
1684:                                        e);
1685:                    sendToDriver(e);
1686:                }
1687:            }
1688:
1689:            private void getControllerVersionNumber() throws IOException {
1690:                if (logger.isDebugEnabled())
1691:                    logger.debug("GetControllerVersionNumber command");
1692:
1693:                sendToDriver(Constants.VERSION);
1694:            }
1695:
1696:            private void getVirtualDatabaseName() throws IOException {
1697:                if (logger.isDebugEnabled())
1698:                    logger.debug("GetVirtualDatabaseName command");
1699:
1700:                sendToDriver(vdb.getDatabaseName());
1701:            }
1702:
1703:            //
1704:            // Transaction management
1705:            //
1706:
1707:            /**
1708:             * Check that we did not get a concurrent abort due to deadlock detection.
1709:             * 
1710:             * @param request request that was executing
1711:             * @throws SQLException if a concurrent abort has been detected
1712:             */
1713:            private void checkForConcurrentAbort(AbstractRequest request)
1714:                    throws SQLException {
1715:                if (transactionStarted) {
1716:                    // 
1717:                    synchronized (this ) {
1718:                        if (transactionHasAborted) {
1719:                            /*
1720:                             * If the transaction was aborted before we execute we would never
1721:                             * have reached this point and vdb.execWriteRequest(write) would have
1722:                             * thrown a SQLException. Now we have to force a rollback because we
1723:                             * have probably lazily re-started the transaction and that has to be
1724:                             * cleaned up.
1725:                             */
1726:                            vdb.rollback(currentTid,
1727:                                    writeQueryExecutedInThisTransaction);
1728:                            throw new SQLException("Transaction " + currentTid
1729:                                    + " aborted, request " + request
1730:                                    + "failed.");
1731:                        }
1732:                    }
1733:                }
1734:            }
1735:
1736:            /**
1737:             * Commit the current transaction and reset the transaction state. If
1738:             * sendTransactionId is true, the current transaction id is send back to the
1739:             * driver else 'true' is sent back. See SEQUOIA-703.
1740:             * 
1741:             * @throws SQLException if an error occurs at commit time
1742:             * @throws IOException if an error occurs when sending the value to the driver
1743:             */
1744:            private void commit() throws SQLException, IOException {
1745:                if (logger.isDebugEnabled())
1746:                    logger.debug("Commit command");
1747:
1748:                if (!transactionHasAborted)
1749:                    vdb.commit(currentTid, writeQueryExecutedInThisTransaction,
1750:                            !queryExecutedInThisTransaction);
1751:                else if (logger.isWarnEnabled()) {
1752:                    logger.warn("Transaction " + currentTid
1753:                            + " was aborted by database");
1754:                }
1755:
1756:                // acknowledged the commit (even if transaction is aborted)
1757:                sendToDriver(currentTid);
1758:
1759:                resetTransactionState();
1760:            }
1761:
1762:            private void begin() throws SQLException, IOException {
1763:                if (logger.isDebugEnabled())
1764:                    logger.debug("Begin command");
1765:
1766:                currentTid = vdb.begin(login, persistentConnection,
1767:                        persistentConnectionId);
1768:                sendToDriver(currentTid);
1769:
1770:                transactionStarted = true;
1771:                transactionHasAborted = false;
1772:                queryExecutedInThisTransaction = false;
1773:                writeQueryExecutedInThisTransaction = false;
1774:                hasSavepoint = 0;
1775:            }
1776:
1777:            /*
1778:             * reset transaction State, begin will be initiated by driver
1779:             */
1780:            private void resetTransactionState() {
1781:                currentTid = 0;
1782:                transactionStarted = false;
1783:                transactionHasAborted = false;
1784:                queryExecutedInThisTransaction = false;
1785:                writeQueryExecutedInThisTransaction = false;
1786:                hasSavepoint = 0;
1787:            }
1788:
1789:            private void rollback() throws SQLException, IOException {
1790:                if (logger.isDebugEnabled())
1791:                    logger.debug("Rollback command");
1792:
1793:                if (!transactionHasAborted)
1794:                    vdb.rollback(currentTid,
1795:                            writeQueryExecutedInThisTransaction);
1796:                else if (logger.isWarnEnabled()) {
1797:                    logger.warn("Transaction " + currentTid
1798:                            + " was aborted by database");
1799:                }
1800:
1801:                // acknowledged the rollback (even if transaction is aborted)
1802:                sendToDriver(currentTid);
1803:
1804:                resetTransactionState();
1805:            }
1806:
1807:            private void setNamedSavepoint() throws SQLException, IOException {
1808:                if (logger.isDebugEnabled())
1809:                    logger.debug("Set named savepoint command");
1810:
1811:                String savepointName = in.readLongUTF();
1812:
1813:                // Check if this is not a duplicate savepoints
1814:                if (vdb.getRequestManager().hasSavepoint(new Long(currentTid),
1815:                        savepointName))
1816:                    throw new SQLException("A savepoint named " + savepointName
1817:                            + " already exists for transaction " + currentTid);
1818:
1819:                vdb.setSavepoint(currentTid, savepointName);
1820:                writeQueryExecutedInThisTransaction = true;
1821:                hasSavepoint++;
1822:                sendToDriver(true);
1823:            }
1824:
1825:            private void setUnnamedSavepoint() throws SQLException, IOException {
1826:                if (logger.isDebugEnabled())
1827:                    logger.debug("Set unnamed savepoint command");
1828:
1829:                int savepointId = vdb.setSavepoint(currentTid);
1830:                writeQueryExecutedInThisTransaction = true;
1831:                hasSavepoint++;
1832:                sendToDriver(savepointId);
1833:            }
1834:
1835:            private void releaseSavepoint() throws SQLException, IOException {
1836:                if (logger.isDebugEnabled())
1837:                    logger.debug("Release savepoint command");
1838:                String savepointName = in.readLongUTF();
1839:                vdb.releaseSavepoint(currentTid, savepointName);
1840:                hasSavepoint--;
1841:                sendToDriver(true);
1842:            }
1843:
1844:            private void rollbackToSavepoint() throws SQLException, IOException {
1845:                if (logger.isDebugEnabled())
1846:                    logger.debug("Rollback to savepoint command");
1847:                String savepointName = in.readLongUTF();
1848:                vdb.rollback(currentTid, savepointName);
1849:                hasSavepoint = vdb
1850:                        .getNumberOfSavepointsInTransaction(currentTid);
1851:                sendToDriver(true);
1852:            }
1853:
1854:            private void retrieveReleaseSavepoint() throws IOException {
1855:                if (logger.isDebugEnabled())
1856:                    logger.debug("Retrieve release savepoint command");
1857:
1858:                // Wait for failover to be authorized
1859:                waitForWritesFlushed(currentTid);
1860:
1861:                String savepointName = in.readLongUTF();
1862:                sendToDriver(!vdb.getRequestManager().hasSavepoint(
1863:                        new Long(currentTid), savepointName));
1864:            }
1865:
1866:            //
1867:            // Decoding commands from the stream
1868:            //
1869:
1870:            /**
1871:             * Read a request (without ResultSet parameters) send by the
1872:             * <code>Connection</code> object.
1873:             * 
1874:             * @return an instance of <code>AbstractRequest</code>
1875:             * @throws IOException if an error occurs in the procotol
1876:             * @throws BadJDBCApiUsageException if the decoded request does not match
1877:             *           anything we can handle
1878:             * @see Request#Request(DriverBufferedInputStream)
1879:             */
1880:            private AbstractRequest decodeRequestFromStream()
1881:                    throws IOException, BadJDBCApiUsageException {
1882:                // Get request from the socket
1883:                Request driverRequest = new Request(in);
1884:
1885:                String sqlQuery = driverRequest.getSqlQueryOrTemplate();
1886:
1887:                if (!requestFactory.isAuthorizedRequest(sqlQuery))
1888:                    throw new BadJDBCApiUsageException(
1889:                            "The following statement is not authorized to execute on the cluster (check your user documentation): "
1890:                                    + sqlQuery);
1891:
1892:                AbstractRequest decodedRequest = requestFactory
1893:                        .requestFromString(sqlQuery, false, driverRequest
1894:                                .isEscapeProcessing(), driverRequest
1895:                                .getTimeoutInSeconds(), connectionLineSeparator);
1896:                if (decodedRequest == null)
1897:                    throw new BadJDBCApiUsageException(
1898:                            "SQL statement does not match a query returning an update count ("
1899:                                    + sqlQuery + ")");
1900:
1901:                decodedRequest.setPreparedStatementParameters(driverRequest
1902:                        .getPreparedStatementParameters());
1903:                decodedRequest.setIsAutoCommit(driverRequest.isAutoCommit());
1904:                return decodedRequest;
1905:            }
1906:
1907:            /**
1908:             * Read a request with ResultSet parameters send by the
1909:             * <code>Connection</code> object.
1910:             * 
1911:             * @param isExecuteQuery set to true if the received query is probably a read
1912:             *          statement (i.e. called by an executeQuery-like statement). This
1913:             *          will give priority to the parsing of read requests.
1914:             * @return an instance of <code>AbstractRequest</code>
1915:             * @throws IOException if an error occurs in the procotol
1916:             * @throws BadJDBCApiUsageException if the request is not authorized to
1917:             *           execute
1918:             * @see RequestWithResultSetParameters#RequestWithResultSetParameters(DriverBufferedInputStream)
1919:             */
1920:            private AbstractRequest decodeRequestWithResultSetParametersFromStream(
1921:                    boolean isExecuteQuery) throws IOException,
1922:                    BadJDBCApiUsageException {
1923:                RequestWithResultSetParameters driverRequest = new RequestWithResultSetParameters(
1924:                        in);
1925:
1926:                String sqlQuery = driverRequest.getSqlQueryOrTemplate();
1927:
1928:                if (!requestFactory.isAuthorizedRequest(sqlQuery))
1929:                    throw new BadJDBCApiUsageException(
1930:                            "The following statement is not authorized to execute on the cluster (check your user documentation): "
1931:                                    + sqlQuery);
1932:
1933:                AbstractRequest decodedRequest = requestFactory
1934:                        .requestFromString(sqlQuery, isExecuteQuery,
1935:                                driverRequest.isEscapeProcessing(),
1936:                                driverRequest.getTimeoutInSeconds(),
1937:                                connectionLineSeparator);
1938:                if (decodedRequest == null) {
1939:                    decodedRequest = new UnknownWriteRequest(sqlQuery,
1940:                            driverRequest.isEscapeProcessing(), driverRequest
1941:                                    .getTimeoutInSeconds(),
1942:                            connectionLineSeparator);
1943:                }
1944:                decodedRequest.setPreparedStatementParameters(driverRequest
1945:                        .getPreparedStatementParameters());
1946:                decodedRequest.setIsAutoCommit(driverRequest.isAutoCommit());
1947:                decodedRequest.setMaxRows(driverRequest.getMaxRows());
1948:                decodedRequest.setFetchSize(driverRequest.getFetchSize());
1949:                decodedRequest.setCursorName(driverRequest.getCursorName());
1950:                return decodedRequest;
1951:            }
1952:
1953:            /**
1954:             * Log a transaction begin if needed for the AbstractRequest.<br />
1955:             * The transaction is started only if needed (if the request is the first
1956:             * write request for the current transaction)
1957:             * 
1958:             * @param request a request
1959:             * @throws SQLException if the transaction has aborted
1960:             */
1961:            private synchronized void logTransactionBegin(
1962:                    AbstractRequest request) throws SQLException {
1963:                transactionStarted = setRequestParameters(request, login,
1964:                        currentTid, transactionStarted);
1965:
1966:                if (connectionHasClosed)
1967:                    throw new SQLException(
1968:                            "Persistent connection is closed, cannot execute query "
1969:                                    + request);
1970:
1971:                if (transactionHasAborted)
1972:                    throw new SQLException(
1973:                            "Transaction is aborted, cannot execute query "
1974:                                    + request);
1975:
1976:                if (!transactionStarted)
1977:                    currentTid = 0;
1978:                else {
1979:                    // Transaction not started, check if we should do a lazy start
1980:                    queryExecutedInThisTransaction = true;
1981:                    writeQueryExecutedInThisTransaction = true;
1982:                }
1983:            }
1984:
1985:            /**
1986:             * Set the login and transaction id on the given request. If the request is
1987:             * autocommit and a transaction was started, the transaction is first commited
1988:             * to return in autocommit mode.
1989:             * 
1990:             * @param request The request to set
1991:             * @param login user login to set
1992:             * @param tid the transaction id to set
1993:             * @return new value of transaction started
1994:             */
1995:            private boolean setRequestParameters(AbstractRequest request,
1996:                    String login, long tid, boolean transactionStarted)
1997:                    throws SQLException {
1998:                request.setClientIpAddress(clientIpAddress);
1999:                request.setLogin(login);
2000:                request.setTransactionIsolation(transactionIsolation);
2001:                request.setLineSeparator(connectionLineSeparator);
2002:                request.setPersistentConnection(persistentConnection);
2003:                request.setPersistentConnectionId(persistentConnectionId);
2004:                request.setRetrieveSQLWarnings(retrieveSQLWarnings);
2005:                request.setIsReadOnly(isReadOnly);
2006:                if (request.isAutoCommit() && transactionStarted) {
2007:                    vdb.commit(tid, writeQueryExecutedInThisTransaction,
2008:                            !queryExecutedInThisTransaction);
2009:                    return false;
2010:                } else
2011:                    request.setTransactionId(tid);
2012:                request.setId(vdb.getNextRequestId());
2013:                return transactionStarted;
2014:            }
2015:
2016:            private void setRequestParametersAndTransactionStarted(
2017:                    AbstractRequest request) throws SQLException {
2018:                synchronized (this ) {
2019:                    transactionStarted = setRequestParameters(request, login,
2020:                            currentTid, transactionStarted);
2021:
2022:                    if (connectionHasClosed)
2023:                        throw new SQLException(
2024:                                "Persistent connection is closed, cannot execute query "
2025:                                        + request);
2026:
2027:                    if (transactionHasAborted)
2028:                        throw new SQLException(
2029:                                "Transaction is aborted, cannot execute query "
2030:                                        + request);
2031:
2032:                    if (!transactionStarted)
2033:                        currentTid = 0;
2034:                    else
2035:                        queryExecutedInThisTransaction = true;
2036:                }
2037:            }
2038:
2039:            //
2040:            // Request execution
2041:            //
2042:
2043:            private void statementExecuteQuery(SelectRequest decodedRequest)
2044:                    throws IOException, SQLException, BadJDBCApiUsageException {
2045:                if (logger.isDebugEnabled())
2046:                    logger.debug("StatementExecuteQuery command");
2047:                AbstractRequest request = decodedRequest;
2048:                if (decodedRequest == null)
2049:                    request = decodeRequestWithResultSetParametersFromStream(true);
2050:
2051:                if (request instanceof  SelectRequest) {
2052:                    SelectRequest select = (SelectRequest) request;
2053:                    setRequestParametersAndTransactionStarted(select);
2054:
2055:                    // Here, if the transaction isolation level was set to SERIALIZABLE, we
2056:                    // need to broadcast the request to all controllers
2057:                    if (!request.isAutoCommit()
2058:                            && requestFactory
2059:                                    .isBroadcastRequired(transactionIsolation)) {
2060:                        select.setMustBroadcast(true);
2061:                        writeQueryExecutedInThisTransaction = true;
2062:                    }
2063:
2064:                    // send the resultset
2065:                    ControllerResultSet crs = vdb.statementExecuteQuery(select);
2066:
2067:                    checkForConcurrentAbort(select);
2068:
2069:                    // If this is a remapping of the call, we have to send the id back
2070:                    if (decodedRequest != null)
2071:                        sendToDriver(select.getId());
2072:
2073:                    // send statement warnings
2074:                    sendToDriver(crs.getStatementWarnings());
2075:
2076:                    sendToDriver(crs);
2077:
2078:                    // streaming
2079:                    if (crs.hasMoreData())
2080:                        streamedResultSets.put(crs.getCursorName(), crs);
2081:                } else if (request instanceof  StoredProcedure) { // This is a stored procedure
2082:                    if (logger.isInfoEnabled())
2083:                        logger
2084:                                .info("Statement.executeQuery() detected a stored procedure ("
2085:                                        + request
2086:                                        + ") remapping the call to CallableStatement.executeQuery()");
2087:                    callableStatementExecuteQuery((StoredProcedure) request,
2088:                            false);
2089:                    return;
2090:                } else
2091:                    throw new BadJDBCApiUsageException(
2092:                            "Statement.executeQuery() not allowed for requests returning an update count ("
2093:                                    + request + ")");
2094:            }
2095:
2096:            /**
2097:             * Execute a write request that returns an int.
2098:             * 
2099:             * @param decodedRequest an already decoded request or null
2100:             * @throws IOException if an error occurs with the socket
2101:             * @throws SQLException if an error occurs while executing the request
2102:             * @throws BadJDBCApiUsageException if a query returning a ResultSet is called
2103:             */
2104:            private void statementExecuteUpdate(
2105:                    AbstractWriteRequest decodedRequest) throws IOException,
2106:                    SQLException, BadJDBCApiUsageException {
2107:                if (logger.isDebugEnabled())
2108:                    logger.debug("StatementExecuteUpdate command");
2109:
2110:                AbstractRequest request = decodedRequest;
2111:                if (request == null) {
2112:                    try {
2113:                        request = decodeRequestFromStream();
2114:                    } catch (BadJDBCApiUsageException e) {
2115:                        throw new BadJDBCApiUsageException(
2116:                                "Statement.executeUpdate() not allowed for requests returning a ResultSet",
2117:                                e);
2118:                    }
2119:                    logTransactionBegin(request);
2120:                }
2121:
2122:                try {
2123:                    AbstractWriteRequest write = (AbstractWriteRequest) request;
2124:
2125:                    // At this point we don't have a stored procedure
2126:                    // Send query id to driver for failover
2127:                    sendToDriver(request.getId());
2128:
2129:                    // Execute the request
2130:                    ExecuteUpdateResult result = vdb
2131:                            .statementExecuteUpdate(write);
2132:                    // Check if there was an issue with deadlock detection
2133:                    checkForConcurrentAbort(write);
2134:                    // Send SQL Warnings if any
2135:                    sendToDriver(result.getStatementWarnings());
2136:                    // Send result back
2137:                    sendToDriver(result.getUpdateCount());
2138:                } catch (ClassCastException e) {
2139:                    if (request instanceof  StoredProcedure) {
2140:                        if (logger.isInfoEnabled())
2141:                            logger
2142:                                    .info("Statement.executeUpdate() detected a stored procedure ("
2143:                                            + request
2144:                                            + ") remapping the call to CallableStatement.executeUpdate()");
2145:                        callableStatementExecuteUpdate(
2146:                                (StoredProcedure) request, false);
2147:                        return;
2148:                    } else
2149:                        throw new BadJDBCApiUsageException(
2150:                                "Statement.executeUpdate() not allowed for requests returning a ResultSet ("
2151:                                        + request + ")");
2152:                }
2153:            }
2154:
2155:            private void statementExecuteUpdateWithKeys() throws IOException,
2156:                    SQLException, BadJDBCApiUsageException {
2157:                if (logger.isDebugEnabled())
2158:                    logger.debug("StatementExecuteUpdateWithKeys command");
2159:                try {
2160:                    // Get the request from the socket
2161:                    AbstractWriteRequest writeWithKeys;
2162:                    try {
2163:                        writeWithKeys = (AbstractWriteRequest) decodeRequestFromStream();
2164:                    } catch (BadJDBCApiUsageException e) {
2165:                        throw new BadJDBCApiUsageException(
2166:                                "Statement.executeUpdate() not allowed for requests returning a ResultSet",
2167:                                e);
2168:                    }
2169:                    logTransactionBegin(writeWithKeys);
2170:
2171:                    // Send query id to driver for failover
2172:                    sendToDriver(writeWithKeys.getId());
2173:
2174:                    // Execute the request
2175:                    GeneratedKeysResult updateCountWithKeys = vdb
2176:                            .statementExecuteUpdateWithKeys(writeWithKeys);
2177:                    // Check if there was an issue with deadlock detection
2178:                    checkForConcurrentAbort(writeWithKeys);
2179:                    // Send SQL Warnings if any
2180:                    sendToDriver(updateCountWithKeys.getStatementWarnings());
2181:                    // Send result back
2182:                    sendToDriver(updateCountWithKeys.getUpdateCount());
2183:                    ControllerResultSet rs = updateCountWithKeys
2184:                            .getControllerResultSet();
2185:                    sendToDriver(rs);
2186:
2187:                    // streaming
2188:                    if (rs.hasMoreData())
2189:                        streamedResultSets.put(rs.getCursorName(),
2190:                                updateCountWithKeys);
2191:                } catch (ClassCastException e) {
2192:                    throw new BadJDBCApiUsageException(
2193:                            "RETURN_GENERATED_KEYS is not supported for stored procedures");
2194:                }
2195:
2196:            }
2197:
2198:            private void statementExecute(AbstractRequest decodedRequest)
2199:                    throws IOException, SQLException {
2200:                if (logger.isDebugEnabled())
2201:                    logger.debug("statementExecute command");
2202:
2203:                AbstractRequest request = decodedRequest;
2204:                if (decodedRequest == null)
2205:                    try {
2206:                        request = decodeRequestWithResultSetParametersFromStream(false);
2207:                    } catch (BadJDBCApiUsageException e) {
2208:                        throw new SQLException(e.getMessage());
2209:                    }
2210:
2211:                synchronized (this ) {
2212:                    transactionStarted = setRequestParameters(request, login,
2213:                            currentTid, transactionStarted);
2214:
2215:                    if (connectionHasClosed)
2216:                        throw new SQLException(
2217:                                "Persistent connection is closed, cannot execute query "
2218:                                        + request);
2219:
2220:                    if (transactionHasAborted)
2221:                        throw new SQLException(
2222:                                "Transaction is aborted, cannot execute query "
2223:                                        + request);
2224:
2225:                    if (!transactionStarted)
2226:                        currentTid = 0;
2227:                    else
2228:                        queryExecutedInThisTransaction = true;
2229:                }
2230:
2231:                ExecuteResult result;
2232:                // Direct to Statement.execute() if this is an inline batch
2233:                // or if matching some statements (such as EXPLAIN ANALYZE in postgres)
2234:                if (requestFactory.requestNeedsExecute(request)) {
2235:                    // Send query id to driver for failover
2236:                    sendToDriver(request.getId());
2237:
2238:                    writeQueryExecutedInThisTransaction = true;
2239:
2240:                    if (request instanceof  SelectRequest) { // Convert to an unknown write request as expected by underlying
2241:                        // components (relates to SEQUOIA-674)
2242:                        UnknownWriteRequest writeRequest = new UnknownWriteRequest(
2243:                                request.getSqlOrTemplate(), request
2244:                                        .getEscapeProcessing(), request
2245:                                        .getTimeout(), request
2246:                                        .getLineSeparator());
2247:                        writeRequest.setIsAutoCommit(request.isAutoCommit());
2248:                        writeRequest.setTransactionId(request
2249:                                .getTransactionId());
2250:                        writeRequest.setTransactionIsolation(request
2251:                                .getTransactionIsolation());
2252:                        writeRequest.setId(request.getId());
2253:                        writeRequest.setLogin(request.getLogin());
2254:                        writeRequest.setPreparedStatementParameters(request
2255:                                .getPreparedStatementParameters());
2256:                        writeRequest.setTimeout(request.getTimeout());
2257:                        writeRequest.setMaxRows(request.getMaxRows());
2258:                        writeRequest.setPersistentConnection(request
2259:                                .isPersistentConnection());
2260:                        writeRequest.setPersistentConnectionId(request
2261:                                .getPersistentConnectionId());
2262:                        request = writeRequest;
2263:                    }
2264:
2265:                    result = vdb.statementExecute(request);
2266:                }
2267:                // Route to CallableStatement.execute() if this is a stored procedure
2268:                else if (request instanceof  StoredProcedure) {
2269:                    if (logger.isInfoEnabled())
2270:                        logger
2271:                                .info("Statement.execute() did detect a stored procedure ("
2272:                                        + request
2273:                                        + ") remapping the call to CallableStatement.execute()");
2274:
2275:                    writeQueryExecutedInThisTransaction = true;
2276:
2277:                    callableStatementExecute((StoredProcedure) request, false);
2278:                    return;
2279:                } else { // Route SELECT to Statement.executeQuery() and others to
2280:                    // Statement.executeUpdate()
2281:
2282:                    // Send query id to driver for failover (driver still expect a
2283:                    // Statement.execute() protocol)
2284:                    sendToDriver(request.getId());
2285:
2286:                    result = new ExecuteResult();
2287:                    if (request instanceof  SelectRequest) {
2288:                        // Here, if the transaction isolation level was set to SERIALIZABLE, we
2289:                        // need to broadcast the select request to all controllers
2290:                        if (!request.isAutoCommit()
2291:                                && requestFactory
2292:                                        .isBroadcastRequired(transactionIsolation)) {
2293:                            ((SelectRequest) request).setMustBroadcast(true);
2294:                            writeQueryExecutedInThisTransaction = true;
2295:                        }
2296:
2297:                        ControllerResultSet crs = vdb
2298:                                .statementExecuteQuery((SelectRequest) request);
2299:                        // call remapping: construct a ExecuteResult from a ExecuteUpdateResult
2300:                        result.addResult(crs);
2301:                        result.setStatementWarnings(crs.getStatementWarnings());
2302:                        result.addResult(-1);
2303:                        // streaming
2304:                        if (crs.hasMoreData())
2305:                            streamedResultSets.put(crs.getCursorName(), crs);
2306:                    } else {
2307:                        writeQueryExecutedInThisTransaction = true;
2308:
2309:                        ExecuteUpdateResult updateCount = vdb
2310:                                .statementExecuteUpdate((AbstractWriteRequest) request);
2311:                        // call remapping: construct a ExecuteResult from a ExecuteUpdateResult
2312:                        result.setStatementWarnings(updateCount
2313:                                .getStatementWarnings());
2314:                        result.addResult(updateCount.getUpdateCount());
2315:                        // end of result list marker
2316:                        if (updateCount.getUpdateCount() != -1)
2317:                            result.addResult(-1);
2318:                    }
2319:                }
2320:
2321:                checkForConcurrentAbort(request);
2322:
2323:                // Send SQL Warnings if any
2324:                sendToDriver(result.getStatementWarnings());
2325:
2326:                for (Iterator iter = result.getResults().iterator(); iter
2327:                        .hasNext();) {
2328:                    Object r = iter.next();
2329:                    if (r instanceof  Integer) {
2330:                        sendToDriver(false);
2331:                        sendToDriver(((Integer) r).intValue());
2332:                    } else if (r instanceof  ControllerResultSet) {
2333:                        sendToDriver(true);
2334:                        sendToDriver((ControllerResultSet) r);
2335:                    } else
2336:                        logger
2337:                                .error("Unexpected result " + r
2338:                                        + " in statementExecute for request "
2339:                                        + request);
2340:                }
2341:            }
2342:
2343:            /**
2344:             * @param decodedProc Stored procedure if called from statementExecuteQuery(),
2345:             *          otherwise null
2346:             * @param returnsOutParameters true if the call must return out/named
2347:             *          parameters
2348:             * @throws BadJDBCApiUsageException
2349:             */
2350:            private void callableStatementExecuteQuery(
2351:                    StoredProcedure decodedProc, boolean returnsOutParameters)
2352:                    throws IOException, SQLException, BadJDBCApiUsageException {
2353:                if (logger.isDebugEnabled())
2354:                    logger.debug("CallableStatementExecuteQuery command");
2355:
2356:                StoredProcedure proc = decodedProc;
2357:                if (proc == null) {
2358:                    AbstractRequest request = decodeRequestWithResultSetParametersFromStream(true);
2359:                    if (request == null)
2360:                        throw new ProtocolException(
2361:                                "Failed to decode stored procedure");
2362:
2363:                    try {
2364:                        // Fetch the query from the socket
2365:                        proc = (StoredProcedure) request;
2366:
2367:                        // Parse the query first to update the semantic information
2368:                        vdb.getRequestManager()
2369:                                .getParsingFromCacheOrParse(proc);
2370:
2371:                        // If procedure is read-only, we don't log lazy begin
2372:                        DatabaseProcedureSemantic semantic = proc.getSemantic();
2373:                        if ((semantic == null) || !semantic.isReadOnly())
2374:                            logTransactionBegin(proc);
2375:                    } catch (ClassCastException e) {
2376:                        if (request instanceof  SelectRequest) {
2377:                            if (logger.isInfoEnabled())
2378:                                logger
2379:                                        .info("CallableStatement.executeQuery() did not detect a stored procedure ("
2380:                                                + request
2381:                                                + ") remapping the call to Statement.executeQuery()");
2382:                            statementExecuteQuery((SelectRequest) request);
2383:                            if (returnsOutParameters)
2384:                                sendNamedAndOutParametersToDriver(request);
2385:                            return;
2386:                        }
2387:                        throw new BadJDBCApiUsageException(
2388:                                "Unhandled stored procedure call in " + request);
2389:                    }
2390:                }
2391:
2392:                setRequestParametersAndTransactionStarted(proc);
2393:
2394:                if (decodedProc == null) { // Send query id to driver for failover
2395:                    sendToDriver(proc.getId());
2396:                }
2397:                // else we come from statement.executeQuery and we should not send the id
2398:
2399:                // Execute the stored procedure
2400:                ControllerResultSet sprs = vdb
2401:                        .callableStatementExecuteQuery(proc);
2402:                checkForConcurrentAbort(proc);
2403:
2404:                // Send SQL Warnings if any
2405:                sendToDriver(sprs.getStatementWarnings());
2406:
2407:                sendToDriver(sprs);
2408:
2409:                // streaming
2410:                if (sprs.hasMoreData())
2411:                    streamedResultSets.put(sprs.getCursorName(), sprs);
2412:
2413:                if (returnsOutParameters)
2414:                    sendNamedAndOutParametersToDriver(proc);
2415:            }
2416:
2417:            /**
2418:             * @param sp Stored procedure if called from statementExecuteUpdate(),
2419:             *          otherwise null
2420:             * @param returnsOutParameters true if the call must return out/named
2421:             *          parameters
2422:             */
2423:            private void callableStatementExecuteUpdate(StoredProcedure sp,
2424:                    boolean returnsOutParameters) throws IOException,
2425:                    SQLException, BadJDBCApiUsageException {
2426:                if (logger.isDebugEnabled())
2427:                    logger.debug("CallableStatementExecuteUpdate command");
2428:
2429:                if (sp == null) {
2430:                    AbstractRequest request;
2431:                    try {
2432:                        request = decodeRequestFromStream();
2433:                    } catch (BadJDBCApiUsageException e) {
2434:                        throw new BadJDBCApiUsageException(
2435:                                "CallableStatement.executeUpdate() not allowed for requests returning a ResultSet ",
2436:                                e);
2437:                    }
2438:                    logTransactionBegin(request);
2439:
2440:                    try {
2441:                        // Fetch the query from the socket
2442:                        sp = (StoredProcedure) request;
2443:                    } catch (ClassCastException e) {
2444:                        if (request instanceof  AbstractWriteRequest) {
2445:                            if (logger.isInfoEnabled())
2446:                                logger
2447:                                        .info("CallableStatement.executeUpdate() did not detect a stored procedure ("
2448:                                                + request
2449:                                                + ") remapping the call to Statement.executeUpdate()");
2450:                            statementExecuteUpdate((AbstractWriteRequest) request);
2451:                            if (returnsOutParameters)
2452:                                sendNamedAndOutParametersToDriver(request);
2453:                            return;
2454:                        }
2455:                        throw new BadJDBCApiUsageException(
2456:                                "Unhandled stored procedure call in " + request);
2457:                    }
2458:                }
2459:
2460:                // Send query id to driver for failover
2461:                sendToDriver(sp.getId());
2462:
2463:                // Execute the query
2464:                ExecuteUpdateResult result = vdb
2465:                        .callableStatementExecuteUpdate(sp);
2466:                checkForConcurrentAbort(sp);
2467:                // Send SQL Warnings if any
2468:                sendToDriver(result.getStatementWarnings());
2469:                // Send result back
2470:                sendToDriver(result.getUpdateCount());
2471:
2472:                if (returnsOutParameters)
2473:                    sendNamedAndOutParametersToDriver(sp);
2474:            }
2475:
2476:            /**
2477:             * @param sp Stored procedure if called from statementExecute(), otherwise
2478:             *          null.
2479:             * @param returnsOutParameters true if the call must return out/named
2480:             *          parameters
2481:             */
2482:            private void callableStatementExecute(StoredProcedure sp,
2483:                    boolean returnsOutParameters) throws IOException,
2484:                    SQLException {
2485:                if (logger.isDebugEnabled())
2486:                    logger.debug("CallableStatementExecute command");
2487:
2488:                if (sp == null) {
2489:                    AbstractRequest request;
2490:                    try {
2491:                        request = decodeRequestWithResultSetParametersFromStream(false);
2492:                    } catch (BadJDBCApiUsageException e) {
2493:                        throw new SQLException(e.getMessage());
2494:                    }
2495:                    if (request == null)
2496:                        throw new ProtocolException(
2497:                                "Failed to decode stored procedure");
2498:                    try {
2499:                        // Fetch the query from the socket
2500:                        sp = (StoredProcedure) request;
2501:
2502:                        // Parse the query first to update the semantic information
2503:                        vdb.getRequestManager().getParsingFromCacheOrParse(sp);
2504:
2505:                        // If procedure is read-only, we don't log lazy begin
2506:                        DatabaseProcedureSemantic semantic = sp.getSemantic();
2507:                        if ((semantic == null) || !semantic.isReadOnly())
2508:                            logTransactionBegin(sp);
2509:                    } catch (ClassCastException e) {
2510:                        if (logger.isInfoEnabled())
2511:                            logger
2512:                                    .info("CallableStatement.execute() did not detect a stored procedure ("
2513:                                            + request
2514:                                            + ") remapping the call to Statement.execute()");
2515:                        statementExecute(request);
2516:                        if (returnsOutParameters)
2517:                            sendNamedAndOutParametersToDriver(request);
2518:                        return;
2519:                    }
2520:                }
2521:
2522:                setRequestParametersAndTransactionStarted(sp);
2523:
2524:                // Send query id to driver for failover
2525:                sendToDriver(sp.getId());
2526:
2527:                // Execute the query
2528:                ExecuteResult result = vdb.callableStatementExecute(sp);
2529:                checkForConcurrentAbort(sp);
2530:
2531:                // Send SQL Warnings if any
2532:                sendToDriver(result.getStatementWarnings());
2533:
2534:                for (Iterator iter = result.getResults().iterator(); iter
2535:                        .hasNext();) {
2536:                    Object r = iter.next();
2537:
2538:                    if (r instanceof  Integer) {
2539:                        sendToDriver(false);
2540:                        sendToDriver(((Integer) r).intValue());
2541:                    } else if (r instanceof  ControllerResultSet) {
2542:                        sendToDriver(true);
2543:                        sendToDriver((ControllerResultSet) r);
2544:                    } else
2545:                        logger.error("Unexepected result " + r
2546:                                + " in callableStatementExecute for request "
2547:                                + sp);
2548:                }
2549:                if (returnsOutParameters)
2550:                    sendNamedAndOutParametersToDriver(sp);
2551:            }
2552:
2553:            private void sendNamedAndOutParametersToDriver(
2554:                    AbstractRequest request) throws IOException,
2555:                    ProtocolException {
2556:                if (request instanceof  StoredProcedure) {
2557:                    try {
2558:                        StoredProcedure proc = (StoredProcedure) request;
2559:                        // First send the out parameters
2560:                        List outParamIndexes = proc.getOutParameterIndexes();
2561:                        if (outParamIndexes != null) {
2562:                            // Now send each param (index, then serializer and serialized object)
2563:                            for (Iterator iter = outParamIndexes.iterator(); iter
2564:                                    .hasNext();) {
2565:                                Integer index = (Integer) iter.next();
2566:                                sendToDriver(index.intValue());
2567:                                Object object = proc
2568:                                        .getOutParameterValue(index);
2569:                                sendObjectToDriver(object);
2570:                            }
2571:                        }
2572:                        sendToDriver(0);
2573:
2574:                        // Fetch the named parameters
2575:                        List namedParamNames = proc.getNamedParameterNames();
2576:                        if (namedParamNames != null) {
2577:                            for (Iterator iter = namedParamNames.iterator(); iter
2578:                                    .hasNext();) {
2579:                                // Send param name first
2580:                                String paramName = (String) iter.next();
2581:                                sendToDriver(paramName);
2582:                                // Now send value (serializer first then serialized object)
2583:                                Object object = proc
2584:                                        .getNamedParameterValue(paramName);
2585:                                sendObjectToDriver(object);
2586:                            }
2587:                        }
2588:                        sendToDriver("0");
2589:                    } catch (NotImplementedException e) {
2590:                        String msg = "Unable to serialize parameter result for request "
2591:                                + request;
2592:                        logger.error(msg, e);
2593:                        throw new ProtocolException(msg);
2594:                    }
2595:                } else
2596:                // Not a stored procedure (remapped call)
2597:                {
2598:                    // No out parameter
2599:                    sendToDriver(0);
2600:                    // No named parameter
2601:                    sendToDriver("0");
2602:                }
2603:            }
2604:
2605:            /**
2606:             * Send an object to the driver (first tag then serialized object).
2607:             * 
2608:             * @param object object to send
2609:             * @throws IOException if an error occurs with the socket
2610:             * @throws NotImplementedException if the object cannot be serialized
2611:             */
2612:            private void sendObjectToDriver(Object object) throws IOException,
2613:                    NotImplementedException {
2614:                if (object == null) { // Special tag for null objects (nothing to send)
2615:                    TypeTag.JAVA_NULL.sendToStream(out);
2616:                } else { // Regular object
2617:                    Serializer s = SQLDataSerialization.getSerializer(object);
2618:                    s.getTypeTag().sendToStream(out);
2619:                    s.sendToStream(object, out);
2620:                }
2621:            }
2622:
2623:            /**
2624:             * Retrieve the result from the request result failover cache for the given
2625:             * request id. If the result is not found, the scheduler is checked in case
2626:             * the query is currently executing. If the query is executing, we wait until
2627:             * it has completed and then return the result. If no result is found, null is
2628:             * returned.
2629:             * 
2630:             * @param requestId the request unique identifier
2631:             * @return the request result or null if not found
2632:             */
2633:            private Serializable getResultForRequestId(long requestId) {
2634:                waitForWritesFlushed(requestId);
2635:
2636:                Serializable result = ((DistributedVirtualDatabase) vdb)
2637:                        .getRequestResultFailoverCache().retrieve(requestId);
2638:
2639:                if (result == null) { // Check if query is not currently executing
2640:                    AbstractScheduler scheduler = vdb.getRequestManager()
2641:                            .getScheduler();
2642:                    if (scheduler.isActiveRequest(requestId)) {
2643:                        // Wait for request completion and then retrieve the result from the
2644:                        // failover cache.
2645:                        scheduler.waitForRequestCompletion(requestId);
2646:                        result = ((DistributedVirtualDatabase) vdb)
2647:                                .getRequestResultFailoverCache().retrieve(
2648:                                        requestId);
2649:                    }
2650:                }
2651:                return result;
2652:            }
2653:
2654:            /**
2655:             * Retrieve the result of a stored procedure that returns multiple results,
2656:             * out parameters and named parameters. Returns null to the driver if the
2657:             * result has not been found.
2658:             * 
2659:             * @throws IOException if an error occurs with the socket
2660:             * @throws SQLException if an error occurs while retrieving the result
2661:             */
2662:            private void retrieveExecuteResultWithParameters()
2663:                    throws IOException, SQLException {
2664:                if (logger.isDebugEnabled())
2665:                    logger
2666:                            .debug("Retrieve execute result with parameters command");
2667:
2668:                long requestId = in.readLong();
2669:
2670:                if (vdb.isDistributed()) {
2671:                    Serializable result = getResultForRequestId(requestId);
2672:
2673:                    if (result != null) {
2674:                        // Cache hit
2675:                        if (result instanceof  StoredProcedureCallResult) {
2676:                            StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2677:
2678:                            // re-send statement warnings
2679:                            sendToDriver(((ExecuteResult) spResult.getResult())
2680:                                    .getStatementWarnings());
2681:                            // Send results first
2682:                            for (Iterator iter = ((ExecuteResult) spResult
2683:                                    .getResult()).getResults().iterator(); iter
2684:                                    .hasNext();) {
2685:                                Object element = iter.next();
2686:                                if (element instanceof  Integer) {
2687:                                    sendToDriver(false);
2688:                                    sendToDriver(((Integer) element).intValue());
2689:                                } else if (element instanceof  ControllerResultSet) {
2690:                                    sendToDriver(true);
2691:                                    sendToDriver((ControllerResultSet) element);
2692:                                } else
2693:                                    logger
2694:                                            .error("Unexpected result "
2695:                                                    + element
2696:                                                    + " in statementExecute for request "
2697:                                                    + requestId);
2698:                            }
2699:
2700:                            // Send parameters
2701:                            sendNamedAndOutParametersToDriver(spResult
2702:                                    .getStoredProcedure());
2703:                        } else
2704:                            throw new SQLException(
2705:                                    "Expected StoredProcedureCallResult for request "
2706:                                            + requestId + " failover but got "
2707:                                            + result);
2708:                    } else {
2709:                        // No cache hit
2710:                        sendToDriver((SQLWarning) null);
2711:                        sendToDriver(true);
2712:                        sendToDriver((ControllerResultSet) null);
2713:                    }
2714:                } else {
2715:                    throw new SQLException(
2716:                            "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2717:                }
2718:            }
2719:
2720:            /**
2721:             * Retrieve the result of a stored procedure that returns an int, out
2722:             * parameters and named parameters. Returns -1 to the driver if the result has
2723:             * not been found.
2724:             * 
2725:             * @throws IOException if an error occurs with the socket
2726:             * @throws SQLException if an error occurs while retrieving the result
2727:             */
2728:            private void retrieveExecuteUpdateResultWithParameters()
2729:                    throws IOException, SQLException {
2730:                if (logger.isDebugEnabled())
2731:                    logger
2732:                            .debug("Retrieve execute update with parameters command");
2733:
2734:                long requestId = in.readLong();
2735:
2736:                if (vdb.isDistributed()) {
2737:                    Serializable result = getResultForRequestId(requestId);
2738:
2739:                    if (result != null) {
2740:                        // Cache hit
2741:                        if (result instanceof  StoredProcedureCallResult) {
2742:                            StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2743:                            // Send warnings
2744:                            ExecuteUpdateResult r = (ExecuteUpdateResult) spResult
2745:                                    .getResult();
2746:                            // re-send statement warnings
2747:                            sendToDriver(r.getStatementWarnings());
2748:                            // Send udpate count
2749:                            sendToDriver(r.getUpdateCount());
2750:                            // Send parameters
2751:                            sendNamedAndOutParametersToDriver(spResult
2752:                                    .getStoredProcedure());
2753:                        } else
2754:                            throw new SQLException(
2755:                                    "Expected StoredProcedureCallResult for request "
2756:                                            + requestId + " failover but got "
2757:                                            + result);
2758:                    } else {
2759:                        // No cache hit
2760:                        sendToDriver((SQLWarning) null);
2761:                        sendToDriver(-1);
2762:                    }
2763:                } else {
2764:                    throw new SQLException(
2765:                            "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2766:                }
2767:            }
2768:
2769:            /**
2770:             * Retrieve the result of a stored procedure that returns a ResultSet, out
2771:             * parameters and named parameters. Returns null to the driver if the result
2772:             * has not been found.
2773:             * 
2774:             * @throws IOException if an error occurs with the socket
2775:             * @throws SQLException if an error occurs while retrieving the result
2776:             */
2777:            private void retrieveExecuteQueryResultWithParameters()
2778:                    throws IOException, SQLException {
2779:                if (logger.isDebugEnabled())
2780:                    logger
2781:                            .debug("Retrieve execute update with parameters command");
2782:
2783:                long requestId = in.readLong();
2784:
2785:                if (vdb.isDistributed()) {
2786:                    Serializable result = getResultForRequestId(requestId);
2787:
2788:                    if (result != null) {
2789:                        // Cache hit
2790:                        if (result instanceof  StoredProcedureCallResult) {
2791:                            StoredProcedureCallResult spResult = (StoredProcedureCallResult) result;
2792:                            // re-send statement warnings
2793:                            sendToDriver(((ControllerResultSet) spResult
2794:                                    .getResult()).getStatementWarnings());
2795:                            // Send ResultSet
2796:                            sendToDriver((ControllerResultSet) spResult
2797:                                    .getResult());
2798:                            // Send parameters
2799:                            sendNamedAndOutParametersToDriver(spResult
2800:                                    .getStoredProcedure());
2801:                        } else
2802:                            throw new SQLException(
2803:                                    "Expected StoredProcedureCallResult for request "
2804:                                            + requestId + " failover but got "
2805:                                            + result);
2806:                    } else {
2807:                        // No cache hit
2808:                        sendToDriver((SQLWarning) null);
2809:                        sendToDriver((ControllerResultSet) null);
2810:                    }
2811:                } else {
2812:                    throw new SQLException(
2813:                            "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2814:                }
2815:            }
2816:
2817:            /**
2818:             * Retrieve the result of a write request that returns an int. Returns -1 to
2819:             * the driver if the result has not been found.
2820:             * 
2821:             * @throws IOException if an error occurs with the socket
2822:             * @throws SQLException if an error occurs while retrieving the result
2823:             */
2824:            private void retrieveExecuteUpdateResult() throws IOException,
2825:                    SQLException {
2826:                if (logger.isDebugEnabled())
2827:                    logger.debug("Retrieve execute update result command");
2828:
2829:                long requestId = in.readLong();
2830:
2831:                if (vdb.isDistributed()) {
2832:                    // Result will always be null since result is only stored in the recovery
2833:                    // log but this call ensures that the query execution has completed.
2834:                    getResultForRequestId(requestId);
2835:
2836:                    // We don't cache the warnings for write queries
2837:                    sendToDriver(new SQLWarning(
2838:                            Translate
2839:                                    .get(
2840:                                            "virtualdatabase.distributed.write.failover.lost.warnings", requestId))); //$NON-NLS-1$
2841:                    sendToDriver(vdb.getRequestManager().getRecoveryLog()
2842:                            .getUpdateCountResultForQuery(requestId));
2843:                } else {
2844:                    throw new SQLException(
2845:                            "Transparent failover for statements that return an update count is only supported in distributed configurations.");
2846:                }
2847:            }
2848:
2849:            /**
2850:             * Retrieve the result of a request that returns multiple results. Returns
2851:             * null to the driver if the result has not been found.
2852:             * 
2853:             * @throws IOException if an error occurs with the socket
2854:             * @throws SQLException if an error occurs while retrieving the result
2855:             */
2856:            private void retrieveExecuteResult() throws IOException,
2857:                    SQLException {
2858:                if (logger.isDebugEnabled())
2859:                    logger.debug("Retrieve execute result command");
2860:
2861:                long requestId = in.readLong();
2862:
2863:                if (vdb.isDistributed()) {
2864:                    Serializable result = getResultForRequestId(requestId);
2865:
2866:                    if (result != null) {
2867:                        // Cache hit
2868:                        // re-send warnings
2869:                        SQLWarning cachedWarns = ((ExecuteResult) result)
2870:                                .getStatementWarnings();
2871:                        sendToDriver(cachedWarns);
2872:                        // and result
2873:                        for (Iterator iter = ((ExecuteResult) result)
2874:                                .getResults().iterator(); iter.hasNext();) {
2875:                            Object element = iter.next();
2876:                            if (element instanceof  Integer) {
2877:                                sendToDriver(false);
2878:                                sendToDriver(((Integer) element).intValue());
2879:                            } else if (element instanceof  ControllerResultSet) {
2880:                                sendToDriver(true);
2881:                                sendToDriver((ControllerResultSet) element);
2882:                            } else
2883:                                logger.error("Unexpected result " + element
2884:                                        + " in statementExecute for request "
2885:                                        + requestId);
2886:                        }
2887:                    } else {
2888:                        try {
2889:                            // The query may have been remapped
2890:                            int updateCount = vdb.getRequestManager()
2891:                                    .getRecoveryLog()
2892:                                    .getUpdateCountResultForQuery(requestId);
2893:                            sendToDriver((SQLWarning) null);
2894:                            if (updateCount != -1) { // Build a response with the update count and add 'no more result'
2895:                                sendToDriver(false);
2896:                                sendToDriver(updateCount);
2897:                                sendToDriver(false);
2898:                                sendToDriver(-1);
2899:                            } else { // Not found
2900:                                sendToDriver(true);
2901:                                sendToDriver((ControllerResultSet) null);
2902:                            }
2903:                        } catch (SQLException ex) { // No cache hit
2904:                            sendToDriver((SQLWarning) null);
2905:                            sendToDriver(true);
2906:                            sendToDriver((ControllerResultSet) null);
2907:                        }
2908:                    }
2909:                } else {
2910:                    throw new SQLException(
2911:                            "Transparent failover for statements that potentially return multiple results is only supported in distributed configurations.");
2912:                }
2913:            }
2914:
2915:            /**
2916:             * Retrieve the result of a write request that returns an int and generated
2917:             * keys. Returns -1 to the driver if the result has not been found.
2918:             * 
2919:             * @throws IOException if an error occurs with the socket
2920:             * @throws SQLException if an error occurs while retrieving the result
2921:             */
2922:            private void retrieveExecuteUpdateWithKeysResult()
2923:                    throws IOException, SQLException {
2924:                if (logger.isDebugEnabled())
2925:                    logger
2926:                            .debug("Retrieve execute update with keys result command");
2927:
2928:                long requestId = in.readLong();
2929:
2930:                if (vdb.isDistributed()) {
2931:                    Serializable result = getResultForRequestId(requestId);
2932:
2933:                    if (result != null) {
2934:                        // Cache hit
2935:                        sendToDriver(((GeneratedKeysResult) result)
2936:                                .getStatementWarnings());
2937:                        sendToDriver(((GeneratedKeysResult) result)
2938:                                .getUpdateCount());
2939:                        sendToDriver(((GeneratedKeysResult) result)
2940:                                .getControllerResultSet());
2941:                    } else {
2942:                        // No cache hit
2943:                        sendToDriver((SQLWarning) null);
2944:                        sendToDriver(-1);
2945:                    }
2946:                } else {
2947:                    throw new SQLException(
2948:                            "Transparent failover for statements that return generated keys is only supported in distributed configurations.");
2949:                }
2950:            }
2951:
2952:            /**
2953:             * Retrieve the result of a request that returns a ResultSet. Returns null to
2954:             * the driver if the result has not been found.
2955:             * 
2956:             * @throws IOException if an error occurs with the socket
2957:             * @throws SQLException if an error occurs while retrieving the result
2958:             */
2959:            private void retrieveExecuteQueryResult() throws IOException,
2960:                    SQLException {
2961:                if (logger.isDebugEnabled())
2962:                    logger.debug("Retrieve execute query result command");
2963:
2964:                long requestId = in.readLong();
2965:
2966:                if (vdb.isDistributed()) {
2967:                    Serializable result = getResultForRequestId(requestId);
2968:
2969:                    if (result != null) {
2970:                        // Cache hit
2971:                        sendToDriver(((ControllerResultSet) result)
2972:                                .getStatementWarnings());
2973:                        sendToDriver((ControllerResultSet) result);
2974:                    } else {
2975:                        // No cache hit
2976:                        sendToDriver((SQLWarning) null);
2977:                        sendToDriver((ControllerResultSet) null);
2978:                    }
2979:                } else {
2980:                    throw new SQLException(
2981:                            "Transparent failover for statements that return a ResultSet is only supported in distributed configurations.");
2982:                }
2983:            }
2984:
2985:            /**
2986:             * Retrieve the result of a transaction commit. If the transaction was not
2987:             * commited, we commit it and acknowledge the driver back in all cases.
2988:             * 
2989:             * @throws IOException if an error occurs with the socket
2990:             * @throws SQLException if an error occurs while retrieving the result
2991:             */
2992:            private void retrieveCommitResult() throws IOException,
2993:                    SQLException {
2994:                if (logger.isDebugEnabled())
2995:                    logger.debug("Retrieve commit command");
2996:
2997:                waitForWritesFlushed(currentTid);
2998:
2999:                if (transactionHasAborted) {
3000:                    if (logger.isWarnEnabled()) {
3001:                        logger.warn("Transaction " + currentTid
3002:                                + " was aborted by database");
3003:                    }
3004:                    // mimic the behavior of the commit() method
3005:                    sendToDriver(currentTid);
3006:                    return;
3007:                }
3008:
3009:                boolean retry;
3010:                do {
3011:                    retry = false;
3012:                    String commitStatus = vdb.getRequestManager()
3013:                            .getRecoveryLog().getCommitStatusForTransaction(
3014:                                    currentTid);
3015:
3016:                    if (LogEntry.MISSING.equals(commitStatus)) {
3017:                        if (writeQueryExecutedInThisTransaction) {
3018:                            // Transaction was not commited yet, let's commit
3019:                            commit();
3020:                        } else {
3021:                            // If this was a read-only transaction, it was never started on this
3022:                            // controller and therefore there is no need to commit. It is ok to
3023:                            // let the user believe that the transaction successfully commited
3024:                            // since it was read-only and had no effect on the database anyway.
3025:                            // We have to tell the client that the transaction was committed
3026:                            // successfully anyway, otherwise the application will hang waiting
3027:                            // for this answer.
3028:                            sendToDriver(currentTid);
3029:                            resetTransactionState();
3030:                            return;
3031:                        }
3032:                    } else if (LogEntry.SUCCESS.equals(commitStatus)) {
3033:                        // Transaction was already commited, acknowledge the transaction id
3034:                        sendToDriver(currentTid);
3035:
3036:                        resetTransactionState();
3037:                    } else if (LogEntry.FAILED.equals(commitStatus)) {
3038:                        logger.warn("Commit of transaction " + currentTid
3039:                                + " failed");
3040:                        // commit failed
3041:                        throw new SQLException("Commit of transaction "
3042:                                + currentTid + " failed");
3043:                    } else {
3044:                        /*
3045:                         * Status is executing or unknown, if status is executing and we have
3046:                         * enabled backends locally, we have to wait for the final status to be
3047:                         * updated in the recovery log
3048:                         */
3049:
3050:                        retry = LogEntry.EXECUTING.equals(commitStatus)
3051:                                && (vdb.getRequestManager().getLoadBalancer()
3052:                                        .getNumberOfEnabledBackends() > 0);
3053:                        if (!retry)
3054:                            throw new SQLException("Commit of transaction "
3055:                                    + currentTid
3056:                                    + " is in unknown or executing state");
3057:                    }
3058:                } while (retry);
3059:
3060:            }
3061:
3062:            /**
3063:             * Retrieve the result of a transaction rollback. If the transaction was not
3064:             * rollbacked, we rollback and acknowledge the driver back in all cases.
3065:             * 
3066:             * @throws IOException if an error occurs with the socket
3067:             * @throws SQLException if an error occurs while retrieving the result
3068:             */
3069:            private void retrieveRollbackResult() throws IOException,
3070:                    SQLException {
3071:                if (logger.isDebugEnabled())
3072:                    logger.debug("Retrieve rollback command");
3073:
3074:                waitForWritesFlushed(currentTid);
3075:
3076:                if (!transactionHasAborted) {
3077:                    String rollbackStatus = vdb.getRequestManager()
3078:                            .getRecoveryLog().getRollbackStatusForTransaction(
3079:                                    currentTid);
3080:
3081:                    if (LogEntry.MISSING.equals(rollbackStatus))
3082:                        // Transaction was not rollbacked yet, let's do it
3083:                        rollback();
3084:                    else if (LogEntry.SUCCESS.equals(rollbackStatus)
3085:                            || LogEntry.FAILED.equals(rollbackStatus)) {
3086:                        // Transaction was already rollbacked, acknowledge the transaction id
3087:                        sendToDriver(currentTid);
3088:                        resetTransactionState();
3089:                    } else { // UNKOWN OR EXECUTING state
3090:                        if (vdb.isDistributed()) {
3091:                            ((DistributedRequestManager) vdb
3092:                                    .getRequestManager())
3093:                                    .cleanupRollbackFromOtherController(currentTid);
3094:                            sendToDriver(currentTid);
3095:                            resetTransactionState();
3096:                        } else {
3097:                            // rollback cannot fail locally, so notify right away even if the
3098:                            // rollback has not fully completed
3099:                            sendToDriver(currentTid);
3100:                            resetTransactionState();
3101:                        }
3102:                    }
3103:                } else {
3104:                    if (logger.isWarnEnabled()) {
3105:                        logger.warn("Transaction " + currentTid
3106:                                + " was aborted by database");
3107:                    }
3108:                    // mimic the behavior of the rollback() method
3109:                    sendToDriver(currentTid);
3110:                }
3111:            }
3112:
3113:            private void waitForWritesFlushed(long requestIdOrTransactionId) {
3114:                // In non-distributed configuration, there is no failover cache, so there
3115:                // is no need to wait
3116:                if (!vdb.isDistributed())
3117:                    return;
3118:
3119:                DistributedVirtualDatabase dvdb = (DistributedVirtualDatabase) vdb;
3120:                dvdb
3121:                        .flushGroupCommunicationMessagesLocally(requestIdOrTransactionId
3122:                                & DistributedRequestManager.CONTROLLER_ID_BIT_MASK);
3123:                dvdb
3124:                        .waitForGroupCommunicationMessagesLocallyFlushed(requestIdOrTransactionId
3125:                                & DistributedRequestManager.CONTROLLER_ID_BIT_MASK);
3126:            }
3127:
3128:            /**
3129:             * Serialize a ControllerResultSet answer, prefixed with the appropriate
3130:             * TypeTag. Note that this will be deserialized in a DriverResultSet.
3131:             * 
3132:             * @param crs the resultset to send
3133:             * @throws IOException stream error
3134:             */
3135:            private void sendToDriver(ControllerResultSet crs)
3136:                    throws IOException {
3137:                /**
3138:                 * If a (buggy) backend was returning a null ResultSet, we would have failed
3139:                 * much earlier in
3140:                 * {@link ControllerResultSet#ControllerResultSet(AbstractRequest, java.sql.ResultSet, MetadataCache, Statement, boolean)
3141:                 */
3142:                /*
3143:                 * So we can safely use "null" as a special value during transparent
3144:                 * failover when the controller's request cache hasn't found a result for a
3145:                 * given request.
3146:                 */
3147:                if (null == crs) {
3148:                    TypeTag.NULL_RESULTSET.sendToStream(out);
3149:                    out.flush();
3150:                    return;
3151:                }
3152:
3153:                try {
3154:                    crs.initSerializers();
3155:                } catch (NotImplementedException nie) { // we don't know how to serialize something
3156:                    sendToDriver(nie);
3157:                    return;
3158:                }
3159:
3160:                TypeTag.RESULTSET.sendToStream(out);
3161:                crs.sendToStream(out);
3162:            }
3163:
3164:            /**
3165:             * Send a protocol String, prefixed with the appropriate TypeTag
3166:             */
3167:            private void sendToDriver(String str) throws IOException {
3168:                TypeTag.NOT_EXCEPTION.sendToStream(out);
3169:                out.writeLongUTF(str);
3170:                out.flush();
3171:            }
3172:
3173:            /**
3174:             * Send a protocol boolean, prefixed with the appropriate TypeTag
3175:             */
3176:            private void sendToDriver(boolean b) throws IOException {
3177:                TypeTag.NOT_EXCEPTION.sendToStream(out);
3178:                out.writeBoolean(b);
3179:                out.flush();
3180:            }
3181:
3182:            /**
3183:             * Send a protocol int, prefixed with the appropriate TypeTag
3184:             */
3185:            private void sendToDriver(int i) throws IOException {
3186:                TypeTag.NOT_EXCEPTION.sendToStream(out);
3187:                out.writeInt(i);
3188:                out.flush();
3189:            }
3190:
3191:            /**
3192:             * Send a protocol long, prefixed with the appropriate TypeTag
3193:             */
3194:            private void sendToDriver(long l) throws IOException {
3195:                TypeTag.NOT_EXCEPTION.sendToStream(out);
3196:                out.writeLong(l);
3197:                out.flush();
3198:            }
3199:
3200:            private void sendToDriver(SQLWarning s) throws IOException {
3201:                if (s != null) {
3202:                    sendToDriver(true);
3203:                    TypeTag.BACKEND_EXCEPTION.sendToStream(out);
3204:                    new BackendDriverException(s).sendToStream(out);
3205:                } else
3206:                    sendToDriver(false);
3207:            }
3208:
3209:            private void sendToDriver(Exception e) throws IOException {
3210:                TypeTag.EXCEPTION.sendToStream(out);
3211:                // This is the place where we convert Exceptions to something
3212:                // serializable and that the driver can understand
3213:                // So this is the place where it's possible to trap all unknown exceptions
3214:
3215:                if (e instanceof  SQLException) { // we assume that an SQLexception comes from the backend
3216:
3217:                    // since this is currently false because some ControllerCoreExceptions
3218:                    // subclass SQLException, here are a few workarounds
3219:                    if (e instanceof  NoMoreBackendException
3220:                            || e instanceof  NoMoreControllerException
3221:                            || e instanceof  NotImplementedException) {
3222:                        TypeTag.CORE_EXCEPTION.sendToStream(out);
3223:                        new ControllerCoreException(e).sendToStream(out);
3224:                        return;
3225:                    }
3226:
3227:                    // non-workaround, regular SQLException from backend
3228:                    TypeTag.BACKEND_EXCEPTION.sendToStream(out);
3229:                    new BackendDriverException(e).sendToStream(out);
3230:                    return;
3231:                }
3232:
3233:                // else we assume this is an exception from the core (currently...?)
3234:                TypeTag.CORE_EXCEPTION.sendToStream(out);
3235:                new ControllerCoreException(e).sendToStream(out);
3236:                return;
3237:
3238:            }
3239:
3240:            /**
3241:             * Implements streaming: send the next ResultSet chunk to driver, pulling it
3242:             * from ControllerResultSet. The driver decides of the chunk size at each
3243:             * call. Note that virtualdatabase streaming is independent from backend
3244:             * streaming (which may not be supported). They even could be configured with
3245:             * two different fetchSize -s (it's not currently the case).
3246:             * <p>
3247:             * This is a real issue: in case of a low fetchsize hint ignored by the driver
3248:             * of the backend, then the whole backend resultset stays in the memory on the
3249:             * controller. And we probably cannot know how many rows did it pulled out.
3250:             * 
3251:             * @see org.continuent.sequoia.controller.loadbalancer.AbstractLoadBalancer#executeStatementExecuteQueryOnBackend(SelectRequest,
3252:             *      org.continuent.sequoia.controller.backend.DatabaseBackend,
3253:             *      org.continuent.sequoia.controller.backend.BackendWorkerThread,
3254:             *      Connection, org.continuent.sequoia.controller.backend.MetadataCache)
3255:             * @see ControllerResultSet#fetchData(int)
3256:             */
3257:            private void fetchNextResultSetRows() throws IOException,
3258:                    SQLException {
3259:                if (logger.isDebugEnabled())
3260:                    logger.debug("FetchNextResultSetRows command");
3261:
3262:                String cursorName = in.readLongUTF();
3263:                int fetchSize = in.readInt();
3264:                ControllerResultSet crs = (ControllerResultSet) streamedResultSets
3265:                        .get(cursorName);
3266:                if (crs == null) {
3267:                    sendToDriver(new SQLException(
3268:                            "No valid ControllerResultSet to fetch data from"));
3269:                } else {
3270:                    // refresh ControllerResultSet with a new chunk of rows
3271:                    crs.fetchData(fetchSize);
3272:
3273:                    // send it
3274:                    TypeTag.NOT_EXCEPTION.sendToStream(out);
3275:                    crs.sendRowsToStream(out);
3276:
3277:                    // At this point we could probably data.clear() already sent as a memory
3278:                    // optimization, but still in doubt about others using it we leave it as
3279:                    // is.
3280:
3281:                    if (!crs.hasMoreData())
3282:                        streamedResultSets.remove(cursorName);
3283:                }
3284:            }
3285:
3286:            //
3287:            // Public API
3288:            //
3289:
3290:            /**
3291:             * Return the current transaction id (should be 0 if not in a transaction).
3292:             * 
3293:             * @return the current transaction id
3294:             */
3295:            public long getCurrentTransactionId() {
3296:                return currentTid;
3297:            }
3298:
3299:            /**
3300:             * Return the persistent connection id.
3301:             * 
3302:             * @return the persistent connection id
3303:             */
3304:            public long getPersistentConnectionId() {
3305:                return persistentConnectionId;
3306:            }
3307:
3308:            /**
3309:             * Get time active
3310:             * 
3311:             * @return time active since started
3312:             */
3313:            public long getTimeActive() {
3314:                return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
3315:            }
3316:
3317:            /**
3318:             * @return Returns the login of the current user.
3319:             */
3320:            public String getUser() {
3321:                if (user == null) {
3322:                    return "No user connected";
3323:                }
3324:                return user.getLogin();
3325:            }
3326:
3327:            //
3328:            // Public API
3329:            //
3330:
3331:            /**
3332:             * Notify the abort of the given transaction which should match the current
3333:             * transaction id of this thread else an exception will be thrown.
3334:             * 
3335:             * @param tid the transaction identifier to abort
3336:             * @throws SQLException if the tid does not correspond to the current
3337:             *           transaction id of this thread or if the abort throws a
3338:             *           SQLException
3339:             */
3340:            public void notifyAbort(long tid) throws SQLException {
3341:                synchronized (this ) {
3342:                    if ((!transactionStarted) || (currentTid != tid))
3343:                        throw new SQLException(
3344:                                "Cannot abort transaction "
3345:                                        + tid
3346:                                        + " since current worker thread is assigned to transaction "
3347:                                        + currentTid);
3348:
3349:                    transactionHasAborted = true;
3350:                }
3351:            }
3352:
3353:            /**
3354:             * Notify the closing of the given persistent connection which should match
3355:             * the current persistent connection id of this thread else an exception will
3356:             * be thrown.
3357:             * 
3358:             * @param tid the persistent connection identifier to abort
3359:             */
3360:            public void notifyClose(long persistentConnectionId) {
3361:                synchronized (this ) {
3362:                    if ((!persistentConnection)
3363:                            || (this .persistentConnectionId != persistentConnectionId))
3364:                        logger
3365:                                .warn("Cannot notify closing of persistent connection "
3366:                                        + persistentConnectionId
3367:                                        + " since current worker thread is assigned to persistent connection "
3368:                                        + this .persistentConnectionId);
3369:
3370:                    connectionHasClosed = true;
3371:                }
3372:            }
3373:
3374:            /**
3375:             * Retrieve general information on this client
3376:             * 
3377:             * @return an array of string
3378:             */
3379:            public String[] retrieveClientData() {
3380:                String[] data = new String[4];
3381:                data[0] = in.getSocket().getInetAddress().getHostName();
3382:                data[1] = in.getSocket().getInetAddress().getHostAddress();
3383:                data[2] = String.valueOf(((System.currentTimeMillis() - in
3384:                        .getDateCreated()) / 1000));
3385:                return data;
3386:            }
3387:
3388:            /**
3389:             * Shutdown this thread by setting <code>isKilled</code> value to true. This
3390:             * gives time to check for needed rollback transactions
3391:             */
3392:            public void shutdown() {
3393:                // Tell this thread to stop working gently.
3394:                // This will cancel transaction if needed
3395:                this .isKilled = true;
3396:                try {
3397:                    if (waitForCommand) {
3398:                        // close only the streams if we're not in the middle of a request
3399:                        in.close();
3400:                        out.close();
3401:                    }
3402:                } catch (IOException e) {
3403:                    // ignore, only the input stream should be close
3404:                    // for this thread to end
3405:                }
3406:            }
3407:
3408:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.