Source Code Cross Referenced for DistributedVirtualDatabase.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » virtualdatabase » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.virtualdatabase 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * Sequoia: Database clustering technology.
0003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004:         * Science And Control (INRIA).
0005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006:         * Copyright (C) 2005-2006 Continuent, Inc.
0007:         * Contact: sequoia@continuent.org
0008:         *
0009:         * Licensed under the Apache License, Version 2.0 (the "License");
0010:         * you may not use this file except in compliance with the License.
0011:         * You may obtain a copy of the License at
0012:         *
0013:         * http://www.apache.org/licenses/LICENSE-2.0
0014:         *
0015:         * Unless required by applicable law or agreed to in writing, software
0016:         * distributed under the License is distributed on an "AS IS" BASIS,
0017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018:         * See the License for the specific language governing permissions and
0019:         * limitations under the License.
0020:         *
0021:         * Initial developer(s): Emmanuel Cecchet.
0022:         * Contributor(s): Olivier Fambon, Damian Arregui, Karl Cassaigne.
0023:         */package org.continuent.sequoia.controller.virtualdatabase;
0024:
0025:        import java.io.IOException;
0026:        import java.io.InputStream;
0027:        import java.io.Serializable;
0028:        import java.sql.SQLException;
0029:        import java.util.ArrayList;
0030:        import java.util.Collection;
0031:        import java.util.Collections;
0032:        import java.util.HashMap;
0033:        import java.util.Hashtable;
0034:        import java.util.Iterator;
0035:        import java.util.LinkedList;
0036:        import java.util.List;
0037:        import java.util.Map;
0038:        import java.util.Properties;
0039:        import java.util.Map.Entry;
0040:
0041:        import org.continuent.hedera.adapters.MessageListener;
0042:        import org.continuent.hedera.adapters.MulticastRequestAdapter;
0043:        import org.continuent.hedera.adapters.MulticastRequestListener;
0044:        import org.continuent.hedera.adapters.MulticastResponse;
0045:        import org.continuent.hedera.channel.AbstractReliableGroupChannel;
0046:        import org.continuent.hedera.channel.ChannelException;
0047:        import org.continuent.hedera.channel.NotConnectedException;
0048:        import org.continuent.hedera.common.Group;
0049:        import org.continuent.hedera.common.GroupIdentifier;
0050:        import org.continuent.hedera.common.IpAddress;
0051:        import org.continuent.hedera.common.Member;
0052:        import org.continuent.hedera.factory.AbstractGroupCommunicationFactory;
0053:        import org.continuent.hedera.gms.AbstractGroupMembershipService;
0054:        import org.continuent.hedera.gms.GroupMembershipListener;
0055:        import org.continuent.sequoia.common.exceptions.ControllerException;
0056:        import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0057:        import org.continuent.sequoia.common.exceptions.SequoiaException;
0058:        import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0059:        import org.continuent.sequoia.common.i18n.Translate;
0060:        import org.continuent.sequoia.common.jmx.management.BackendInfo;
0061:        import org.continuent.sequoia.common.jmx.management.DumpInfo;
0062:        import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
0063:        import org.continuent.sequoia.common.log.Trace;
0064:        import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
0065:        import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0066:        import org.continuent.sequoia.common.users.VirtualDatabaseUser;
0067:        import org.continuent.sequoia.common.util.Constants;
0068:        import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0069:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
0070:        import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0071:        import org.continuent.sequoia.controller.backup.DumpTransferInfo;
0072:        import org.continuent.sequoia.controller.core.Controller;
0073:        import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0074:        import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
0075:        import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0076:        import org.continuent.sequoia.controller.requestmanager.RequestManager;
0077:        import org.continuent.sequoia.controller.requestmanager.distributed.ControllerFailureCleanupThread;
0078:        import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0079:        import org.continuent.sequoia.controller.requests.AbstractRequest;
0080:        import org.continuent.sequoia.controller.virtualdatabase.activity.ActivityService;
0081:        import org.continuent.sequoia.controller.virtualdatabase.management.RestoreLogOperation;
0082:        import org.continuent.sequoia.controller.virtualdatabase.management.TransferBackendOperation;
0083:        import org.continuent.sequoia.controller.virtualdatabase.management.TransferDumpOperation;
0084:        import org.continuent.sequoia.controller.virtualdatabase.protocol.AddVirtualDatabaseUser;
0085:        import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendStatus;
0086:        import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendTransfer;
0087:        import org.continuent.sequoia.controller.virtualdatabase.protocol.CompleteRecoveryLogResync;
0088:        import org.continuent.sequoia.controller.virtualdatabase.protocol.ControllerInformation;
0089:        import org.continuent.sequoia.controller.virtualdatabase.protocol.CopyLogEntry;
0090:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint;
0091:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest;
0092:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker;
0093:        import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage;
0094:        import org.continuent.sequoia.controller.virtualdatabase.protocol.FlushGroupCommunicationMessages;
0095:        import org.continuent.sequoia.controller.virtualdatabase.protocol.GetPreparedStatementMetadata;
0096:        import org.continuent.sequoia.controller.virtualdatabase.protocol.GetStaticMetadata;
0097:        import org.continuent.sequoia.controller.virtualdatabase.protocol.InitiateDumpCopy;
0098:        import org.continuent.sequoia.controller.virtualdatabase.protocol.IsValidUserForAllBackends;
0099:        import org.continuent.sequoia.controller.virtualdatabase.protocol.MessageTimeouts;
0100:        import org.continuent.sequoia.controller.virtualdatabase.protocol.RemoveVirtualDatabaseUser;
0101:        import org.continuent.sequoia.controller.virtualdatabase.protocol.ReplicateLogEntries;
0102:        import org.continuent.sequoia.controller.virtualdatabase.protocol.ResumeActivity;
0103:        import org.continuent.sequoia.controller.virtualdatabase.protocol.ResyncRecoveryLog;
0104:        import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendWritesMessage;
0105:        import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration;
0106:        import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfigurationResponse;
0107:
0108:        /**
0109:         * A <code>DistributedVirtualDatabase</code> is a virtual database hosted by
0110:         * several controllers. Communication between the controllers is achieved with
0111:         * reliable multicast provided by Javagroups.
0112:         *
0113:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0114:         * @author <a href="mailto:Damian.Arregui@continuent.com">Damian Arregui</a>
0115:         * @version 1.0
0116:         */
0117:        public class DistributedVirtualDatabase extends VirtualDatabase
0118:                implements  MessageListener, MulticastRequestListener,
0119:                GroupMembershipListener {
0120:            //
0121:            // How the code is organized ?
0122:            //
0123:            // 1. Member variables
0124:            // 2. Constructor(s)
0125:            // 3. Request handling
0126:            // 4. Transaction handling
0127:            // 5. Database backend management
0128:            // 6. Distribution management (multicast)
0129:            // 7. Getter/Setter (possibly in alphabetical order)
0130:            //
0131:
0132:            // Distribution
0133:
0134:            /** Group name */
0135:            private String groupName = null;
0136:
0137:            /**
0138:             * Hashtable&lt;Member,String&gt;, the String being the controller JMX name
0139:             * corresponding to the Member
0140:             */
0141:            private Hashtable controllerJmxAddress;
0142:
0143:            /** Hashtable&lt;Member, Long&lt;DatabaseBackend&gt;&gt; */
0144:            private Hashtable controllerIds;
0145:
0146:            /** Hashtable&lt;Member, List&lt;DatabaseBackend&gt;&gt; */
0147:            private Hashtable backendsPerController;
0148:
0149:            /** Hedera channel */
0150:            private AbstractReliableGroupChannel channel = null;
0151:
0152:            /** MessageDispatcher to communicate with the group */
0153:            private MulticastRequestAdapter multicastRequestAdapter = null;
0154:
0155:            private MessageTimeouts messageTimeouts;
0156:
0157:            private Group currentGroup = null;
0158:
0159:            private ArrayList allMemberButUs = null;
0160:
0161:            /**
0162:             * Used by VirtualDatabaseConfiguration if a remote controller config is not
0163:             * compatible
0164:             */
0165:            public static final long INCOMPATIBLE_CONFIGURATION = -1;
0166:
0167:            private boolean isVirtualDatabaseStarted = false;
0168:
0169:            /**
0170:             * Our view of the request manager, same as super.requestManager, only just
0171:             * typed properly.
0172:             */
0173:            private DistributedRequestManager distributedRequestManager;
0174:
0175:            private boolean processMacroBeforeBroadcast;
0176:
0177:            /**
0178:             * List of threads that are cleaning up resources allocated by a dead remote
0179:             * controller
0180:             */
0181:            private Hashtable cleanupThreads;
0182:
0183:            /**
0184:             * Stores the "flushed writes" status for each failed controller: true if
0185:             * writes have been flushed by the controller failure clean-up thread or a vdb
0186:             * worker thread clean-up thread, false otherwise.
0187:             */
0188:            private HashMap groupCommunicationMessagesLocallyFlushed;
0189:
0190:            /**
0191:             * Maximum time in ms allowed for clients to failover in case of a controller
0192:             * failure
0193:             */
0194:            private long failoverTimeoutInMs;
0195:
0196:            /**
0197:             * Cache of request results to allow transparent failover if a failure occurs
0198:             * during a write request.
0199:             */
0200:            private RequestResultFailoverCache requestResultFailoverCache;
0201:
0202:            /** Logger for distributed request execution */
0203:            private Trace distributedRequestLogger;
0204:
0205:            private String hederaPropertiesFile;
0206:
0207:            private final Object MESSAGES_IN_HANDLER_SYNC = new Object();
0208:
0209:            private int messagesInHandlers = 0;
0210:
0211:            private boolean channelShuttingDown = false;
0212:
0213:            private boolean isResynchingFlag;
0214:
0215:            private Hashtable controllerPersistentConnectionsRecovered = new Hashtable();
0216:
0217:            private Hashtable controllerTransactionsRecovered = new Hashtable();
0218:            private PartitionReconciler partitionReconciler;
0219:
0220:            private List ongoingActivitySuspensions = new ArrayList();
0221:
0222:            /** JVM-wide group communication factory */
0223:            private static AbstractGroupCommunicationFactory groupCommunicationFactory = null;
0224:
0225:            /**
0226:             * Creates a new <code>DistributedVirtualDatabase</code> instance.
0227:             *
0228:             * @param controller the controller we belong to
0229:             * @param name the virtual database name
0230:             * @param groupName the virtual database group name
0231:             * @param maxConnections maximum number of concurrent connections.
0232:             * @param pool should we use a pool of threads for handling connections?
0233:             * @param minThreads minimum number of threads in the pool
0234:             * @param maxThreads maximum number of threads in the pool
0235:             * @param maxThreadIdleTime maximum time a thread can remain idle before being
0236:             *          removed from the pool.
0237:             * @param clientFailoverTimeoutInMs maximum time for clients to failover in
0238:             *          case of a controller failure
0239:             * @param sqlShortFormLength maximum number of characters of an SQL statement
0240:             *          to display in traces or exceptions
0241:             * @param useStaticResultSetMetaData true if DatabaseResultSetMetaData should
0242:             *          use static fields or try to fetch the metadata from the underlying
0243:             *          database
0244:             * @param hederaPropertiesFile Hedera properties file defines the group
0245:             *          communication factory and its parameters
0246:             */
0247:            public DistributedVirtualDatabase(Controller controller,
0248:                    String name, String groupName, int maxConnections,
0249:                    boolean pool, int minThreads, int maxThreads,
0250:                    long maxThreadIdleTime, int sqlShortFormLength,
0251:                    long clientFailoverTimeoutInMs,
0252:                    boolean useStaticResultSetMetaData,
0253:                    String hederaPropertiesFile,
0254:                    boolean enforceTableExistenceIntoSchema) {
0255:                super (controller, name, maxConnections, pool, minThreads,
0256:                        maxThreads, maxThreadIdleTime, sqlShortFormLength,
0257:                        useStaticResultSetMetaData,
0258:                        enforceTableExistenceIntoSchema);
0259:
0260:                this .groupName = groupName;
0261:                this .processMacroBeforeBroadcast = true;
0262:                this .failoverTimeoutInMs = clientFailoverTimeoutInMs;
0263:                requestResultFailoverCache = new RequestResultFailoverCache(
0264:                        logger, failoverTimeoutInMs);
0265:                backendsPerController = new Hashtable();
0266:                controllerJmxAddress = new Hashtable();
0267:                controllerIds = new Hashtable();
0268:                cleanupThreads = new Hashtable();
0269:                groupCommunicationMessagesLocallyFlushed = new HashMap();
0270:                isVirtualDatabaseStarted = false;
0271:                distributedRequestLogger = Trace
0272:                        .getLogger("org.continuent.sequoia.controller.distributedvirtualdatabase.request."
0273:                                + name);
0274:                this .hederaPropertiesFile = hederaPropertiesFile;
0275:                this .totalOrderQueue = new LinkedList();
0276:            }
0277:
0278:            /**
0279:             * Disconnect the channel and close it.
0280:             *
0281:             * @see java.lang.Object#finalize()
0282:             */
0283:            protected void finalize() throws Throwable {
0284:                quitChannel();
0285:                super .finalize();
0286:            }
0287:
0288:            /**
0289:             * This method handle the scheduling part of the queries to be sure that the
0290:             * query is scheduled in total order before letting other queries to execute.
0291:             *
0292:             * @see org.continuent.hedera.adapters.MulticastRequestListener#handleMessageSingleThreaded(java.io.Serializable,
0293:             *      org.continuent.hedera.common.Member)
0294:             */
0295:            public Object handleMessageSingleThreaded(Serializable msg,
0296:                    Member sender) {
0297:                synchronized (MESSAGES_IN_HANDLER_SYNC) {
0298:                    if (channelShuttingDown)
0299:                        return MESSAGES_IN_HANDLER_SYNC;
0300:                    messagesInHandlers++;
0301:                }
0302:
0303:                try {
0304:                    if (msg != null) {
0305:                        if (logger.isDebugEnabled())
0306:                            logger.debug("handleMessageSingleThreaded ("
0307:                                    + msg.getClass() + "): " + msg);
0308:
0309:                        if (msg instanceof  DistributedVirtualDatabaseMessage) {
0310:                            return ((DistributedVirtualDatabaseMessage) msg)
0311:                                    .handleMessageSingleThreaded(this , sender);
0312:                        }
0313:                        // Other message types will be handled in multithreaded handler
0314:                    } else {
0315:                        String errorMsg = "Invalid null message";
0316:                        logger.error(errorMsg);
0317:                        return new ControllerException(errorMsg);
0318:                    }
0319:
0320:                    return null;
0321:                } catch (Exception e) {
0322:                    if (e instanceof  RuntimeException)
0323:                        logger.warn("Error while handling group message:"
0324:                                + msg.getClass(), e);
0325:                    return e;
0326:                }
0327:            }
0328:
0329:            /**
0330:             * @see org.continuent.hedera.adapters.MulticastRequestListener#handleMessageMultiThreaded(java.io.Serializable,
0331:             *      org.continuent.hedera.common.Member, java.lang.Object)
0332:             */
0333:            public Serializable handleMessageMultiThreaded(Serializable msg,
0334:                    Member sender, Object handleMessageSingleThreadedResult) {
0335:                // Check if we are shutting down first
0336:                if (msg == MESSAGES_IN_HANDLER_SYNC)
0337:                    return null;
0338:
0339:                try {
0340:                    if (msg == null) {
0341:                        String errorMsg = "Invalid null message";
0342:                        logger.error(errorMsg);
0343:                        return new ControllerException(errorMsg);
0344:                    }
0345:
0346:                    if (logger.isDebugEnabled())
0347:                        logger.debug("handleMessageMultiThreaded ("
0348:                                + msg.getClass() + "): " + msg);
0349:                    if (msg instanceof  DistributedVirtualDatabaseMessage) {
0350:                        return ((DistributedVirtualDatabaseMessage) msg)
0351:                                .handleMessageMultiThreaded(this , sender,
0352:                                        handleMessageSingleThreadedResult);
0353:                    } else
0354:                        logger.warn("Unhandled message type received: "
0355:                                + msg.getClass() + "(" + msg + ")");
0356:
0357:                    return null;
0358:                } catch (Exception e) {
0359:                    if (e instanceof  RuntimeException)
0360:                        logger.warn("Error while handling group message: "
0361:                                + msg.getClass(), e);
0362:                    return e;
0363:                } finally {
0364:                    synchronized (MESSAGES_IN_HANDLER_SYNC) {
0365:                        messagesInHandlers--;
0366:                        if (messagesInHandlers <= 0)
0367:                            MESSAGES_IN_HANDLER_SYNC.notifyAll();
0368:                    }
0369:
0370:                    if (msg != null) {
0371:                        // Just in case something bad happen and the request was not properly
0372:                        // removed from the queue.
0373:                        if (msg instanceof  DistributedRequest) {
0374:                            synchronized (totalOrderQueue) {
0375:                                if (totalOrderQueue
0376:                                        .remove(((DistributedRequest) msg)
0377:                                                .getRequest())) {
0378:                                    logger
0379:                                            .warn("Distributed request "
0380:                                                    + ((DistributedRequest) msg)
0381:                                                            .getRequest()
0382:                                                            .getSqlShortForm(
0383:                                                                    getSqlShortFormLength())
0384:                                                    + " did not remove itself from the total order queue");
0385:                                    totalOrderQueue.notifyAll();
0386:                                }
0387:                            }
0388:                        } else if (msg instanceof  DistributedTransactionMarker) {
0389:                            synchronized (totalOrderQueue) {
0390:                                if (totalOrderQueue.remove(msg)) {
0391:                                    logger
0392:                                            .warn("Distributed "
0393:                                                    + msg.toString()
0394:                                                    + " did not remove "
0395:                                                    + "itself from the total order queue");
0396:                                    totalOrderQueue.notifyAll();
0397:                                }
0398:                            }
0399:                        }
0400:                    }
0401:                }
0402:            }
0403:
0404:            public void cancelMessage(Serializable msg) {
0405:                if (msg instanceof  DistributedVirtualDatabaseMessage) {
0406:                    ((DistributedVirtualDatabaseMessage) msg).cancel(this );
0407:                } else
0408:                    logger.warn("Unhandled message type received: "
0409:                            + msg.getClass() + "(" + msg + ")");
0410:            }
0411:
0412:            /**
0413:             * Flushes all messages in the group communication locally, i.e. ensures that
0414:             * all previously scheduled messages are processed.
0415:             *
0416:             * @param failedControllerId controller whose suspected failure triggered the
0417:             *          flush operation.
0418:             */
0419:            public void flushGroupCommunicationMessagesLocally(
0420:                    long failedControllerId) {
0421:                // Set "writes flushed" flag to false
0422:                synchronized (groupCommunicationMessagesLocallyFlushed) {
0423:                    if (!groupCommunicationMessagesLocallyFlushed
0424:                            .containsKey(new Long(failedControllerId)))
0425:                        groupCommunicationMessagesLocallyFlushed.put(new Long(
0426:                                failedControllerId), Boolean.FALSE);
0427:                }
0428:
0429:                try {
0430:                    List dest = new ArrayList();
0431:
0432:                    if ((multicastRequestAdapter != null)
0433:                            && (multicastRequestAdapter.getChannel() != null)) {
0434:                        dest.add(multicastRequestAdapter.getChannel()
0435:                                .getLocalMembership());
0436:
0437:                        // Don't care about result
0438:                        multicastRequestAdapter.multicastMessage(dest,
0439:                                new FlushGroupCommunicationMessages(
0440:                                        failedControllerId),
0441:                                MulticastRequestAdapter.WAIT_ALL, 0);
0442:                    }
0443:                } catch (Throwable e) {
0444:                    logger
0445:                            .error(
0446:                                    "Failed to flush group communication messages locally",
0447:                                    e);
0448:                } finally {
0449:                    // Set "writes flushed" flag to true and notify blocked vdb worker
0450:                    // threads
0451:                    synchronized (groupCommunicationMessagesLocallyFlushed) {
0452:                        groupCommunicationMessagesLocallyFlushed.put(new Long(
0453:                                failedControllerId), Boolean.TRUE);
0454:                        groupCommunicationMessagesLocallyFlushed.notifyAll();
0455:                    }
0456:                }
0457:            }
0458:
0459:            /**
0460:             * Waits for all messages in the group communication to be flushed locally.
0461:             *
0462:             * @param failedControllerId controller whose suspected failure triggered the
0463:             *          wait operation.
0464:             */
0465:            public void waitForGroupCommunicationMessagesLocallyFlushed(
0466:                    long failedControllerId) {
0467:                Long controllerIdKey = new Long(failedControllerId);
0468:
0469:                // Set "writes flushed" flag to false and wait for writes being flushed by
0470:                // the controller failure clean-up thread or the vdb worker thread
0471:                synchronized (groupCommunicationMessagesLocallyFlushed) {
0472:                    if (!groupCommunicationMessagesLocallyFlushed
0473:                            .containsKey(controllerIdKey))
0474:                        groupCommunicationMessagesLocallyFlushed.put(
0475:                                controllerIdKey, Boolean.FALSE);
0476:
0477:                    while (!((Boolean) groupCommunicationMessagesLocallyFlushed
0478:                            .get(controllerIdKey)).booleanValue()) {
0479:                        try {
0480:                            if (logger.isDebugEnabled()) {
0481:                                logger
0482:                                        .debug("Will wait for writes to be flushed for failed controller "
0483:                                                + controllerIdKey);
0484:                            }
0485:                            groupCommunicationMessagesLocallyFlushed.wait();
0486:                        } catch (InterruptedException e) {
0487:                            // Ignore exception
0488:                        }
0489:                    }
0490:                }
0491:            }
0492:
0493:            /**
0494:             * @see org.continuent.hedera.gms.GroupMembershipListener#failedMember(org.continuent.hedera.common.Member,
0495:             *      org.continuent.hedera.common.GroupIdentifier,
0496:             *      org.continuent.hedera.common.Member)
0497:             */
0498:            public void failedMember(Member failed, GroupIdentifier gid,
0499:                    Member sender) {
0500:                quitMember(failed, gid);
0501:            }
0502:
0503:            /**
0504:             * @see org.continuent.hedera.gms.GroupMembershipListener#groupComposition(org.continuent.hedera.common.Group,
0505:             *      org.continuent.hedera.common.IpAddress, int)
0506:             */
0507:            public void groupComposition(Group g, IpAddress sender,
0508:                    int gmsStatus) {
0509:                // Just ignore
0510:            }
0511:
0512:            /**
0513:             * @see org.continuent.hedera.gms.GroupMembershipListener#networkPartition(org.continuent.hedera.common.GroupIdentifier,
0514:             *      java.util.List)
0515:             */
0516:            public void networkPartition(GroupIdentifier gid,
0517:                    final List mergedGroupCompositions) {
0518:                // We already left the group. Late notification. Ignore.
0519:                if ((channel == null) || channelShuttingDown)
0520:                    return;
0521:
0522:                if (gid.equals(currentGroup.getGroupIdentifier())) {
0523:                    String msg = "Network partition detected in group " + gid
0524:                            + ".";
0525:                    logger.error(msg);
0526:                    endUserLogger.error(msg);
0527:
0528:                    if (shuttingDown) {
0529:                        logger
0530:                                .warn("Do not start network reconciliation process since "
0531:                                        + name + " is shutting down");
0532:                        return;
0533:                    }
0534:
0535:                    sendJmxNotification(
0536:                            SequoiaNotificationList.VDB_NETWORK_PARTITION_DETECTION,
0537:                            "A network partition has been detected for the virtual database "
0538:                                    + name + " (gid=" + gid + ").");
0539:                    Thread t = new Thread() {
0540:                        public void run() {
0541:                            reconcile(mergedGroupCompositions);
0542:                        }
0543:                    };
0544:                    t.start();
0545:                }
0546:            }
0547:
0548:            /**
0549:             * Makes this virtual database join a virtual database group. Those groups are
0550:             * mapped to JavaGroups groups.
0551:             *
0552:             * @exception Exception if an error occurs
0553:             */
0554:            public void joinGroup(boolean resyncRecoveryLog) throws Exception {
0555:                RecoveryLog recoveryLog = distributedRequestManager
0556:                        .getRecoveryLog();
0557:                if (recoveryLog == null) {
0558:                    String msg = "Distributed virtual database cannot be used without a recovery log defined.";
0559:                    if (logger.isFatalEnabled())
0560:                        logger.fatal(msg);
0561:                    throw new SequoiaException(msg);
0562:                }
0563:
0564:                logger.info("Recovery log size: "
0565:                        + recoveryLog.getRecoveryLogSize());
0566:
0567:                try {
0568:                    Properties p = new Properties();
0569:                    InputStream is = this .getClass().getResourceAsStream(
0570:                            hederaPropertiesFile);
0571:                    if (is == null) {
0572:                        if (logger.isFatalEnabled())
0573:                            logger
0574:                                    .fatal(Translate
0575:                                            .get(
0576:                                                    "fatal.distributed.no.group.communication.properties",
0577:                                                    hederaPropertiesFile));
0578:                        endUserLogger
0579:                                .fatal(Translate
0580:                                        .get(
0581:                                                "fatal.distributed.no.group.communication.properties",
0582:                                                hederaPropertiesFile));
0583:                        throw new SequoiaException(
0584:                                "Join group failed because Hedera properties file was not found.");
0585:                    }
0586:                    if (logger.isInfoEnabled())
0587:                        logger.info("Using Hedera properties file: "
0588:                                + hederaPropertiesFile);
0589:                    p.load(is);
0590:                    is.close();
0591:
0592:                    if (groupCommunicationFactory == null) {
0593:                        groupCommunicationFactory = (AbstractGroupCommunicationFactory) Class
0594:                                .forName(p.getProperty("hedera.factory"))
0595:                                .newInstance();
0596:                    }
0597:                    Object[] ret = groupCommunicationFactory
0598:                            .createChannelAndGroupMembershipService(p,
0599:                                    new GroupIdentifier(groupName));
0600:                    AbstractGroupMembershipService gms = (AbstractGroupMembershipService) ret[1];
0601:                    gms.registerGroupMembershipListener(this );
0602:                    channel = (AbstractReliableGroupChannel) ret[0];
0603:
0604:                    if (logger.isDebugEnabled())
0605:                        logger
0606:                                .debug("Group communication channel is configured as follows: "
0607:                                        + channel);
0608:
0609:                    // Join the group
0610:                    channel.join();
0611:                    currentGroup = channel.getGroup();
0612:                    multicastRequestAdapter = new MulticastRequestAdapter(
0613:                            channel // group
0614:                            // channel
0615:                            , this  /* MessageListener */
0616:                            , this  /* MulticastRequestListener */
0617:                    );
0618:                    multicastRequestAdapter.start();
0619:
0620:                    // Let the MulticastRequestAdapter thread pump the membership out of the
0621:                    // JGroups channel.
0622:                    Thread.sleep(2000);
0623:
0624:                    logger.info("Group " + groupName + " connected to "
0625:                            + channel.getLocalMembership());
0626:
0627:                    // Add ourselves to the list of controllers
0628:                    controllerJmxAddress.put(channel.getLocalMembership(),
0629:                            controller.getJmxName());
0630:
0631:                    long controllerId;
0632:
0633:                    // Check if we are alone or not
0634:                    List currentGroupMembers = currentGroup.getMembers();
0635:                    int groupSize = currentGroupMembers.size();
0636:                    if (groupSize == 1) {
0637:                        logger
0638:                                .info(Translate
0639:                                        .get(
0640:                                                "virtualdatabase.distributed.configuration.first.in.group",
0641:                                                groupName));
0642:
0643:                        // Refuse to start if we are first but see no last-man-down marker
0644:                        if (!recoveryLog.isLastManDown())
0645:                            throw new VirtualDatabaseException(
0646:                                    "NOT STARTING VDB (not the last man down).");
0647:
0648:                        allMemberButUs = new ArrayList();
0649:                        controllerId = 0;
0650:                        distributedRequestManager.setControllerId(controllerId);
0651:                        if (resyncRecoveryLog) {
0652:                            // Init backends states from persistence base
0653:                            distributedRequestManager
0654:                                    .initBackendsLastKnownCheckpointFromRecoveryLog();
0655:                            recoveryLog.checkRecoveryLogConsistency();
0656:                        }
0657:                    } else {
0658:                        logger.info("Group now contains " + groupSize
0659:                                + " controllers.");
0660:                        if (logger.isDebugEnabled()) {
0661:                            logger
0662:                                    .debug("Current list of controllers is as follows:");
0663:                            for (Iterator iter = currentGroupMembers.iterator(); iter
0664:                                    .hasNext();)
0665:                                logger.debug("Controller " + iter.next());
0666:                        }
0667:
0668:                        refreshGroupMembership(); // also updates allMemberButUs
0669:
0670:                        // Check with the other controller that our config is compatible
0671:                        controllerId = checkConfigurationCompatibilityAndReturnControllerId(getAllMemberButUs());
0672:                        if (controllerId == INCOMPATIBLE_CONFIGURATION) {
0673:                            String msg = Translate
0674:                                    .get("virtualdatabase.distributed.configuration.not.compatible");
0675:                            logger.error(msg);
0676:                            throw new ControllerException(msg);
0677:                        } else {
0678:                            // In case several controllers join at the same time they would get
0679:                            // the
0680:                            // same highest controller id value and here we discriminate them by
0681:                            // adding their position in the membership. This assumes that the
0682:                            // membership is ordered the same way at all nodes.
0683:                            controllerId += currentGroupMembers.indexOf(channel
0684:                                    .getLocalMembership());
0685:
0686:                            if (logger.isInfoEnabled()) {
0687:                                logger
0688:                                        .info(Translate
0689:                                                .get("virtualdatabase.distributed.configuration.compatible"));
0690:                                logger.info("Controller identifier is set to: "
0691:                                        + controllerId);
0692:                            }
0693:                            // Set the controller Id
0694:                            distributedRequestManager
0695:                                    .setControllerId(controllerId);
0696:                        }
0697:
0698:                        if (resyncRecoveryLog) {
0699:                            // Init backends states from persistence base
0700:                            distributedRequestManager
0701:                                    .initBackendsLastKnownCheckpointFromRecoveryLog();
0702:                        }
0703:
0704:                        // Distribute backends among controllers knowing that at this point
0705:                        // there is no conflict on the backend distribution policies.
0706:                        broadcastBackendInformation(getAllMemberButUs());
0707:                    }
0708:
0709:                    // Now let group comm messages flow in
0710:                    isVirtualDatabaseStarted = true;
0711:
0712:                    // Resync the recovery log, if any
0713:                    if (resyncRecoveryLog && (groupSize > 1)
0714:                            && hasRecoveryLog()) {
0715:                        logger.info("Resyncing recovery log ...");
0716:                        resyncRecoveryLog();
0717:                        logger.info("Resyncing recovery log done");
0718:                    }
0719:
0720:                    initGlobalCounters(getControllerId());
0721:
0722:                    recoveryLog.clearLastManDown();
0723:
0724:                } catch (Exception e) {
0725:                    if (channel != null) {
0726:                        quitChannel();
0727:                    }
0728:                    String msg = Translate.get(
0729:                            "virtualdatabase.distributed.joingroup.error",
0730:                            groupName);
0731:                    if (e instanceof  RuntimeException)
0732:                        logger.error(msg, e);
0733:                    throw new Exception(msg + " (" + e + ")", e);
0734:                }
0735:
0736:            }
0737:
0738:            private void reconcile(List suspectedMembers) {
0739:                try {
0740:                    distributedRequestManager.blockActivity();
0741:                } catch (SQLException e) {
0742:                    logger.warn(e.getMessage(), e);
0743:                }
0744:
0745:                boolean shutdownLocally = false;
0746:
0747:                Iterator iter = suspectedMembers.iterator();
0748:                while (iter.hasNext()) {
0749:                    Member other = (Member) iter.next();
0750:                    if (isLocalSender(other)) {
0751:                        continue;
0752:                    }
0753:                    partitionReconciler = new PartitionReconciler(name,
0754:                            groupName, channel.getLocalMembership(),
0755:                            hederaPropertiesFile);
0756:
0757:                    PartitionReconciliationStatus reconciliationStatus;
0758:                    try {
0759:                        reconciliationStatus = partitionReconciler
0760:                                .reconcileWith(other);
0761:                        partitionReconciler.dispose();
0762:                    } catch (Exception e) {
0763:                        logger.error("Exception while reconciling with "
0764:                                + other, e);
0765:                        // in doubt, shutdown arbitrarily one of the vdb parts
0766:                        shutdownUnlessFirstMember(suspectedMembers);
0767:                        return;
0768:                    }
0769:                    String msg = "reconciliation status = "
0770:                            + reconciliationStatus + ", other = " + other;
0771:                    logger.info(msg);
0772:                    endUserLogger.info(msg);
0773:
0774:                    if (reconciliationStatus == PartitionReconciliationStatus.NO_ACTIVITY) {
0775:                        try {
0776:                            Collections.sort(suspectedMembers);
0777:                            Member firstMember = (Member) suspectedMembers
0778:                                    .get(0);
0779:                            if (!(firstMember.equals(channel
0780:                                    .getLocalMembership()))) {
0781:                                logger.info(channel.getLocalMembership()
0782:                                        + " is rejoining group " + currentGroup
0783:                                        + "...");
0784:                                quitChannel();
0785:                                channelShuttingDown = false;
0786:                                joinGroup(false);
0787:                                logger.info(channel.getLocalMembership()
0788:                                        + " has rejoined group " + currentGroup
0789:                                        + "...");
0790:                            }
0791:                            distributedRequestManager.resumeActivity();
0792:                        } catch (Exception e) {
0793:                            logger.error("Exception while reconciling with "
0794:                                    + other, e);
0795:                            // in doubt, shutdown arbitrarily one of the vdb parts
0796:                            shutdownUnlessFirstMember(suspectedMembers);
0797:                        }
0798:                        return;
0799:                    }
0800:                    if (reconciliationStatus == PartitionReconciliationStatus.ALONE_IN_THE_WORLD) {
0801:                        shutdown(Constants.SHUTDOWN_FORCE);
0802:                        return;
0803:                    }
0804:                    if (reconciliationStatus == PartitionReconciliationStatus.OTHER_ALONE_IN_THE_WORLD) {
0805:                        // do nothing
0806:                        continue;
0807:                    }
0808:                    // in all other cases, decide arbitrarily which vdb part will survive
0809:                    shutdownLocally = shutdownUnlessFirstMember(suspectedMembers);
0810:                    if (shutdownLocally) {
0811:                        return;
0812:                    }
0813:                }
0814:                if (logger.isInfoEnabled()) {
0815:                    logger.info("End of reconciliation process for " + name
0816:                            + ". Resume activity normally.");
0817:                }
0818:                distributedRequestManager.resumeActivity();
0819:            }
0820:
0821:            /*
0822:             * Shutdown local member unless it is the first in the members list (sorted
0823:             * alphabetically).
0824:             */
0825:            private boolean shutdownUnlessFirstMember(List members) {
0826:                Collections.sort(members);
0827:                Member firstMember = (Member) members.get(0);
0828:                logger.fatal(firstMember + " will remain as master.");
0829:                if (!(firstMember.equals(channel.getLocalMembership()))) {
0830:                    logger.fatal("Forcing virtual database shutdown here at "
0831:                            + channel.getLocalMembership() + ".");
0832:                    distributedRequestManager.resumeActivity();
0833:                    shutdown(Constants.SHUTDOWN_FORCE);
0834:                    return true;
0835:                } else {
0836:                    logger.fatal("Virtual database here at "
0837:                            + channel.getLocalMembership()
0838:                            + " remaining as master.");
0839:                    return false;
0840:                }
0841:            }
0842:
0843:            /**
0844:             * Checks if re-synchronizing the recovery log is necessary, and if so,
0845:             * initiates the recovery log recovery process and waits until it is complete.
0846:             * This is called as part of the vdb startup process, before backends are
0847:             * ready to be enabled.
0848:             *
0849:             * @throws VirtualDatabaseException in case of error
0850:             * @throws SQLException rethrown from recoverylog operations
0851:             */
0852:            private void resyncRecoveryLog() throws VirtualDatabaseException,
0853:                    SQLException {
0854:                if (getAllMembers().size() == 1) {
0855:                    logger
0856:                            .info("First controller in vdb, no recovery log resync.");
0857:                    return;
0858:                }
0859:
0860:                String lastShutdownCheckpointName = getLastShutdownCheckpointName();
0861:                if (lastShutdownCheckpointName == null) {
0862:                    // Reset recovery log cleanup if JVM property
0863:                    // "reset.dirty.recoverylog" is set to false (default is true)
0864:                    if ("true".equals(System.getProperty(
0865:                            "reset.dirty.recoverylog", "true"))) {
0866:                        logger
0867:                                .info("No shutdown checkpoint found in recovery log. Clearing recovery log (dirty).");
0868:                        logger
0869:                                .info("Please resync manually using 'restore log'.");
0870:                        getRecoveryLog().resetRecoveryLog(false);
0871:                    } else {
0872:                        logger
0873:                                .info("No shutdown checkpoint found in recovery log: please resync manually using 'restore log'.");
0874:                        logger
0875:                                .warn("Keeping recovery log for debug purpose only because reset.dirty.recoverylog property is set to false. \n"
0876:                                        + "DO NOT PERFORM ADMIN COMMANDS ON THIS VIRTUAL DATABASE ON THIS NODE BEFORE RESTORING THE RECOVERY LOG.");
0877:                    }
0878:                    return;
0879:                }
0880:
0881:                // try to resync from the last shutdown checkpoint
0882:                isResynchingFlag = true;
0883:
0884:                try {
0885:                    logger.info("Resyncing from " + lastShutdownCheckpointName);
0886:                    resyncFromCheckpoint(lastShutdownCheckpointName);
0887:                } catch (VirtualDatabaseException e) {
0888:                    // Reset recovery log cleanup if JVM property
0889:                    // "reset.dirty.recoverylog" is set to false (default is true)
0890:                    if ("true".equals(System.getProperty(
0891:                            "reset.dirty.recoverylog", "true"))) {
0892:                        logger
0893:                                .error("Failed to resync recovery log from last clean shutdown checkpoint. Clearing recovery log (dirty).");
0894:                        logger
0895:                                .info("Please resync manually using 'restore log'.");
0896:                        getRecoveryLog().resetRecoveryLog(false);
0897:                    } else {
0898:                        logger
0899:                                .error("Failed to resync recovery log from last clean shutdown checkpoint.");
0900:                        logger
0901:                                .warn("Keeping recovery log for debug purpose only because reset.dirty.recoverylog property is set to false. \n"
0902:                                        + "DO NOT PERFORM ADMIN COMMANDS ON THIS VIRTUAL DATABASE ON THIS NODE BEFORE RESTORING THE RECOVERY LOG.");
0903:                    }
0904:                    isResynchingFlag = false;
0905:                }
0906:            }
0907:
0908:            /**
0909:             * Initializes global counters based on recovery log.
0910:             *
0911:             * @param controllerId this controller id, as allocated by hte group
0912:             *          communication. Base of all global counters numbering: counters are
0913:             *          layed out as [ controllerId | <local count> ]
0914:             * @throws SQLException if an error occurs accessing the recovery log
0915:             */
0916:            public void initGlobalCounters(long controllerId)
0917:                    throws SQLException {
0918:                // Counters update should be properly synchronized since this it may happen
0919:                // concurrently with a vdb worker thread delivering unique IDs. This
0920:                // scenario occurs when using the "restore log" admin command following a
0921:                // vdb crash.
0922:                if (!hasRecoveryLog())
0923:                    return; // no recovery log: no init. Stick to default values (zero)
0924:                RecoveryLog recoveryLog = requestManager.getRecoveryLog();
0925:                requestManager.initializeRequestId(recoveryLog
0926:                        .getLastRequestId(controllerId) + 1);
0927:                requestManager.getScheduler().initializeTransactionId(
0928:                        recoveryLog.getLastTransactionId(controllerId) + 1);
0929:                synchronized (CONNECTION_ID_SYNC_OBJECT) {
0930:                    // Use the max operator as a safeguard: IDs may have been delivered but
0931:                    // not logged yet.
0932:                    this .connectionId = Math.max(this .connectionId, recoveryLog
0933:                            .getLastConnectionId(controllerId)) + 1;
0934:                }
0935:            }
0936:
0937:            private void resyncFromCheckpoint(String checkpointName)
0938:                    throws VirtualDatabaseException {
0939:                // talk to first other member in group
0940:                Member remoteControllerMember = (Member) getAllMemberButUs()
0941:                        .get(0);
0942:
0943:                // send message, no return expected, just block until it's done.
0944:                sendMessageToController(remoteControllerMember,
0945:                        new ResyncRecoveryLog(checkpointName), messageTimeouts
0946:                                .getDefaultTimeout());
0947:
0948:                // If a remote error occured, or if the resync failed for any reason
0949:                // whatsoever, a VirtualDatabaseException has been thrown.
0950:
0951:                isResynchingFlag = false;
0952:            }
0953:
0954:            protected boolean isResyncing() {
0955:                return isResynchingFlag;
0956:            }
0957:
0958:            /**
0959:             * Return the last ShutdownCheckpointName, or null if none was found.
0960:             *
0961:             * @return the last ShutdownCheckpointName, or null if none was found.
0962:             * @throws VirtualDatabaseException in case getting the cp names list from the
0963:             *           recovery log failed
0964:             */
0965:            private String getLastShutdownCheckpointName()
0966:                    throws VirtualDatabaseException {
0967:                // get checkpoint names and see which is the last shutdown checkpoint. This
0968:                // list is expected to be ordered, newest first.
0969:                ArrayList checkpointNames;
0970:                try {
0971:                    checkpointNames = getRecoveryLog().getCheckpointNames();
0972:                } catch (SQLException e) {
0973:                    logger.error(e.getMessage());
0974:                    throw new VirtualDatabaseException(e);
0975:                }
0976:
0977:                Iterator iter = checkpointNames.iterator();
0978:                while (iter.hasNext()) {
0979:                    String cpName = (String) iter.next();
0980:                    if (cpName.startsWith("shutdown-" + getControllerName()))
0981:                        return cpName;
0982:                }
0983:                return null;
0984:            }
0985:
0986:            /**
0987:             * @see org.continuent.hedera.gms.GroupMembershipListener#joinMember(org.continuent.hedera.common.Member,
0988:             *      org.continuent.hedera.common.GroupIdentifier)
0989:             */
0990:            public void joinMember(Member m, GroupIdentifier gid) {
0991:                if (hasRecoveryLog()) {
0992:                    try {
0993:                        requestManager.getRecoveryLog()
0994:                                .storeCheckpoint(
0995:                                        buildCheckpointName(m
0996:                                                + " joined group " + gid));
0997:                    } catch (SQLException ignore) {
0998:                        logger
0999:                                .warn("Failed to log checkpoint for joining member "
1000:                                        + m);
1001:                    }
1002:                }
1003:            }
1004:
1005:            //
1006:            // Message dispatcher request handling
1007:            //
1008:
1009:            /**
1010:             * Terminate the multicast request adapter and quit the Hedera channel.
1011:             */
1012:            public void quitChannel() {
1013:                quitChannel(Constants.SHUTDOWN_SAFE);
1014:            }
1015:
1016:            /**
1017:             * Terminate the multicast request adapter and quit the Hedera channel.
1018:             *
1019:             * @param level of the vdb shutdown operation being executed
1020:             */
1021:            public void quitChannel(int level) {
1022:                if (level == Constants.SHUTDOWN_FORCE) {
1023:                    multicastRequestAdapter.cancelRequests();
1024:                }
1025:                synchronized (MESSAGES_IN_HANDLER_SYNC) {
1026:                    channelShuttingDown = true;
1027:                    if (messagesInHandlers > 0)
1028:                        try {
1029:                            MESSAGES_IN_HANDLER_SYNC.wait();
1030:                        } catch (InterruptedException ignore) {
1031:                        }
1032:                }
1033:
1034:                if (multicastRequestAdapter != null) {
1035:                    multicastRequestAdapter.stop();
1036:                    multicastRequestAdapter = null;
1037:                }
1038:                if (channel != null) {
1039:                    channel.close();
1040:                    try {
1041:                        channel.quit();
1042:                    } catch (ChannelException e) {
1043:                        if (logger.isWarnEnabled()) {
1044:                            logger.warn("Problem when quitting channel "
1045:                                    + channel, e);
1046:                        }
1047:                    } catch (NotConnectedException e) {
1048:                        if (logger.isWarnEnabled()) {
1049:                            logger.warn("Problem when quitting channel "
1050:                                    + channel, e);
1051:                        }
1052:                    }
1053:                    channel = null;
1054:                }
1055:                if (groupCommunicationFactory != null) {
1056:                    // If we were not able to successfully dispose the factory we must keep a
1057:                    // reference to it in order to dispose it later
1058:                    if (groupCommunicationFactory.dispose())
1059:                        groupCommunicationFactory = null;
1060:                }
1061:            }
1062:
1063:            /**
1064:             * @see org.continuent.hedera.gms.GroupMembershipListener#quitMember(org.continuent.hedera.common.Member,
1065:             *      org.continuent.hedera.common.GroupIdentifier)
1066:             */
1067:            public void quitMember(Member m, GroupIdentifier gid) {
1068:                synchronized (MESSAGES_IN_HANDLER_SYNC) {
1069:                    // Ignore if channel has been closed (i.e. vdb shutdown)
1070:                    if ((channel == null) || (channelShuttingDown))
1071:                        return;
1072:                    messagesInHandlers++;
1073:                }
1074:
1075:                try {
1076:                    // Ignore our own quit message
1077:                    if (isLocalSender(m))
1078:                        return;
1079:
1080:                    if (hasRecoveryLog()) {
1081:                        try {
1082:                            requestManager.getRecoveryLog().storeCheckpoint(
1083:                                    buildCheckpointName(m + " quit group "
1084:                                            + gid));
1085:                        } catch (SQLException ignore) {
1086:                            logger
1087:                                    .warn("Failed to log checkpoint for quitting member "
1088:                                            + m);
1089:                        }
1090:                    }
1091:
1092:                    ActivityService.getInstance().addUnreachableMember(name,
1093:                            m.getAddress());
1094:
1095:                    // Remove controller from list and notify JMX listeners
1096:                    String remoteControllerName = removeRemoteControllerAndStartCleanupThread(m);
1097:                    if (remoteControllerName != null) {
1098:                        endUserLogger.warn(Translate.get(
1099:                                "notification.distributed.controller.removed",
1100:                                new String[] { m.toString(), name }));
1101:                        logger.warn("Controller " + m
1102:                                + " has left the cluster.");
1103:                        sendJmxNotification(
1104:                                SequoiaNotificationList.DISTRIBUTED_CONTROLLER_REMOVED,
1105:                                Translate
1106:                                        .get(
1107:                                                "notification.distributed.controller.removed",
1108:                                                m, name));
1109:                    }
1110:
1111:                    // Notify adapter that we do not expect responses anymore from this member
1112:                    synchronized (MESSAGES_IN_HANDLER_SYNC) {
1113:                        // Ignore if channel is being closed, i.e. vdb shutdown.
1114:                        if (!channelShuttingDown) {
1115:                            int failures = multicastRequestAdapter
1116:                                    .memberFailsOnAllReplies(m);
1117:                            logger.info(failures
1118:                                    + " requests were waiting responses from "
1119:                                    + m);
1120:                        }
1121:                    }
1122:                } finally {
1123:                    synchronized (MESSAGES_IN_HANDLER_SYNC) {
1124:                        messagesInHandlers--;
1125:                    }
1126:                }
1127:            }
1128:
1129:            /**
1130:             * @see org.continuent.hedera.adapters.MessageListener#receive(java.io.Serializable)
1131:             */
1132:            public void receive(Serializable msg) {
1133:                logger
1134:                        .error("Distributed virtual database received unhandled message: "
1135:                                + msg);
1136:            }
1137:
1138:            /**
1139:             * Refresh the current group membership when someone has joined or left the
1140:             * group.
1141:             */
1142:            private void refreshGroupMembership() {
1143:                if (logger.isDebugEnabled())
1144:                    logger.debug("Refreshing members list:"
1145:                            + currentGroup.getMembers());
1146:
1147:                synchronized (controllerJmxAddress) {
1148:                    allMemberButUs = (ArrayList) (((ArrayList) currentGroup
1149:                            .getMembers()).clone());
1150:                    allMemberButUs.remove(channel.getLocalMembership());
1151:                }
1152:            }
1153:
1154:            //
1155:            // Getter/Setter and tools (equals, ...)
1156:            //
1157:
1158:            /**
1159:             * Two virtual databases are equal if they have the same name, login and
1160:             * password.
1161:             *
1162:             * @param other an object
1163:             * @return a <code>boolean</code> value
1164:             */
1165:            public boolean equals(Object other) {
1166:                if ((other == null)
1167:                        || (!(other instanceof  org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase)))
1168:                    return false;
1169:                else {
1170:                    DistributedVirtualDatabase db = (org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase) other;
1171:                    return name.equals(db.getDatabaseName())
1172:                            && groupName.equals(db.getGroupName());
1173:                }
1174:            }
1175:
1176:            /**
1177:             * Synchronized access to current group members.
1178:             *
1179:             * @return a clone of the list of all members (never null).
1180:             */
1181:            public ArrayList getAllMembers() {
1182:                // FIXME Should not return ArrayList but rather List
1183:                synchronized (controllerJmxAddress) {
1184:                    if (currentGroup == null) // this happens if we did not #joinGroup()
1185:                        return new ArrayList();
1186:                    ArrayList members = (ArrayList) currentGroup.getMembers();
1187:                    if (members == null) // SEQUOIA-745 fix
1188:                        return new ArrayList();
1189:                    return (ArrayList) members.clone();
1190:                }
1191:            }
1192:
1193:            /**
1194:             * Returns the list of all members in the group except us. Consider the value
1195:             * read-only (do not alter).
1196:             *
1197:             * @return the allMembersButUs field (never null).
1198:             */
1199:            public ArrayList getAllMemberButUs() {
1200:                synchronized (controllerJmxAddress) {
1201:                    if (allMemberButUs == null) // this happens if we did not #joinGroup()
1202:                        return new ArrayList();
1203:
1204:                    /**
1205:                     * This synchronized block might seem loussy, but actually it's enough, as
1206:                     * long as no caller alters the returned value: field allMembersButUs is
1207:                     * replaced (as opposed to updated) by refreshGroupMembership(). So
1208:                     * someone who has called this lives with a (possibly) outdated list, but,
1209:                     * still, with a safe list (never updated concurently by vdb threads). If
1210:                     * clients/callers are not trusted to leave the returned value un-touched,
1211:                     * use a clone
1212:                     */
1213:                    return allMemberButUs;
1214:                }
1215:            }
1216:
1217:            /**
1218:             * Get the group channel used for group communications
1219:             *
1220:             * @return a <code>JChannel</code>
1221:             */
1222:            public AbstractReliableGroupChannel getChannel() {
1223:                return channel;
1224:            }
1225:
1226:            /**
1227:             * Returns the cleanupThreads value.
1228:             *
1229:             * @return Returns the cleanupThreads.
1230:             */
1231:            public Hashtable getCleanupThreads() {
1232:                return cleanupThreads;
1233:            }
1234:
1235:            /**
1236:             * Used by the ControllerFailureCleanupThread to cleanup following a
1237:             * controller failure. This returned the list of recovered transactions and
1238:             * removes the list.
1239:             *
1240:             * @param controllerId the id of the failed controller
1241:             * @return List of recovered transactions for the given controller id (null if
1242:             *         none)
1243:             */
1244:            public List getTransactionsRecovered(Long controllerId) {
1245:                return (List) controllerTransactionsRecovered
1246:                        .remove(controllerId);
1247:            }
1248:
1249:            /**
1250:             * Used by the ControllerFailureCleanupThread to cleanup following a
1251:             * controller failure. This returned the list of recovered persistent
1252:             * connections and removes the list.
1253:             *
1254:             * @param controllerId the id of the failed controller
1255:             * @return List of recovered persistent connections for the given controller
1256:             *         id (null if none)
1257:             */
1258:            public List getControllerPersistentConnectionsRecovered(
1259:                    Long controllerId) {
1260:                return (List) controllerPersistentConnectionsRecovered
1261:                        .remove(controllerId);
1262:            }
1263:
1264:            /**
1265:             * Called by FailoverForPersistentConnection when a client reconnects
1266:             * following a controller failure.
1267:             *
1268:             * @param controllerId the id of the failed controller
1269:             * @param connectionId the id of the persistent connection
1270:             */
1271:            public void notifyPersistentConnectionFailover(Long controllerId,
1272:                    Long connectionId) {
1273:                synchronized (controllerPersistentConnectionsRecovered) {
1274:                    LinkedList persistentConnectionsRecovered = (LinkedList) controllerPersistentConnectionsRecovered
1275:                            .get(controllerId);
1276:                    if (persistentConnectionsRecovered == null) {
1277:                        persistentConnectionsRecovered = new LinkedList();
1278:                        controllerPersistentConnectionsRecovered.put(
1279:                                controllerId, persistentConnectionsRecovered);
1280:                    }
1281:
1282:                    persistentConnectionsRecovered.add(connectionId);
1283:                    if (logger.isInfoEnabled())
1284:                        logger
1285:                                .info("Failover detected for persistent connection "
1286:                                        + connectionId);
1287:                }
1288:            }
1289:
1290:            /**
1291:             * Called by FailoverForTransaction when a client reconnects following a
1292:             * controller failure.
1293:             *
1294:             * @param controllerId the id of the failed controller
1295:             * @param transactionId the id of the transaction
1296:             */
1297:            public void notifyTransactionFailover(Long controllerId,
1298:                    Long transactionId) {
1299:                synchronized (controllerTransactionsRecovered) {
1300:                    LinkedList transactionsRecovered = (LinkedList) controllerTransactionsRecovered
1301:                            .get(controllerId);
1302:                    if (transactionsRecovered == null) {
1303:                        transactionsRecovered = new LinkedList();
1304:                        controllerTransactionsRecovered.put(controllerId,
1305:                                transactionsRecovered);
1306:                    }
1307:
1308:                    transactionsRecovered.add(transactionId);
1309:                    if (logger.isInfoEnabled())
1310:                        logger.info("Failover detected for transaction "
1311:                                + transactionId);
1312:                }
1313:            }
1314:
1315:            /**
1316:             * Gets a Controller specified by its name as a Member object suitable for
1317:             * group communication.
1318:             *
1319:             * @param controllerName the name of the target controller
1320:             * @return a Member representing the target controller
1321:             * @throws VirtualDatabaseException
1322:             */
1323:            private Member getControllerByName(String controllerName)
1324:                    throws VirtualDatabaseException {
1325:                // Get the target controller
1326:                Iterator iter = controllerJmxAddress.entrySet().iterator();
1327:                Member targetMember = null;
1328:                while (iter.hasNext()) {
1329:                    Entry entry = (Entry) iter.next();
1330:                    if (entry.getValue().equals(controllerName)) {
1331:                        targetMember = (Member) entry.getKey();
1332:                        break;
1333:                    }
1334:                }
1335:                if (targetMember == null)
1336:                    throw new VirtualDatabaseException(
1337:                            "Cannot find controller " + controllerName
1338:                                    + " in group");
1339:                return targetMember;
1340:            }
1341:
1342:            /**
1343:             * Returns the controllerName value.
1344:             *
1345:             * @return Returns the controllerName.
1346:             */
1347:            public String getControllerName() {
1348:                return controller.getControllerName();
1349:            }
1350:
1351:            /**
1352:             * Returns the controller ID.
1353:             *
1354:             * @return Returns the controller ID.
1355:             */
1356:            public long getControllerId() {
1357:                return ((DistributedRequestManager) requestManager)
1358:                        .getControllerId();
1359:            }
1360:
1361:            /**
1362:             * Returns the currentGroup value.
1363:             *
1364:             * @return Returns the currentGroup.
1365:             */
1366:            public Group getCurrentGroup() {
1367:                return currentGroup;
1368:            }
1369:
1370:            /**
1371:             * Returns the distributedRequestLogger value.
1372:             *
1373:             * @return Returns the distributedRequestLogger.
1374:             */
1375:            public final Trace getDistributedRequestLogger() {
1376:                return distributedRequestLogger;
1377:            }
1378:
1379:            /**
1380:             * Get the XML dump of the Distribution element.
1381:             *
1382:             * @return XML dump of the Distribution element
1383:             */
1384:            protected String getDistributionXml() {
1385:                StringBuffer info = new StringBuffer();
1386:                info.append("<" + DatabasesXmlTags.ELT_Distribution + " "
1387:                        + DatabasesXmlTags.ATT_groupName + "=\"" + groupName
1388:                        + "\" " + DatabasesXmlTags.ATT_hederaPropertiesFile
1389:                        + "=\"" + hederaPropertiesFile + "\" "
1390:                        + DatabasesXmlTags.ATT_clientFailoverTimeout + "=\""
1391:                        + failoverTimeoutInMs + "\">");
1392:
1393:                getMessageTimeouts().generateXml(info);
1394:
1395:                info.append("</" + DatabasesXmlTags.ELT_Distribution + ">");
1396:                return info.toString();
1397:            }
1398:
1399:            /**
1400:             * Returns the group name this virtual database belongs to.
1401:             *
1402:             * @return a <code>String</code> value. Returns <code>null</code> if this
1403:             *         virtual database is standalone
1404:             */
1405:            public String getGroupName() {
1406:                return groupName;
1407:            }
1408:
1409:            /**
1410:             * Sets the group name used by the controllers hosting this virtual database.
1411:             *
1412:             * @param groupName the group name to set
1413:             */
1414:            public void setGroupName(String groupName) {
1415:                this .groupName = groupName;
1416:            }
1417:
1418:            /**
1419:             * Returns the messageTimeouts value.
1420:             *
1421:             * @return Returns the messageTimeouts.
1422:             */
1423:            public MessageTimeouts getMessageTimeouts() {
1424:                return messageTimeouts;
1425:            }
1426:
1427:            /**
1428:             * Sets the messageTimeouts value.
1429:             *
1430:             * @param messageTimeouts The messageTimeouts to set.
1431:             */
1432:            public void setMessageTimeouts(MessageTimeouts messageTimeouts) {
1433:                this .messageTimeouts = messageTimeouts;
1434:            }
1435:
1436:            /**
1437:             * Return the group communication multicast request adapter.
1438:             *
1439:             * @return the group communication multicast request adapter
1440:             */
1441:            public MulticastRequestAdapter getMulticastRequestAdapter() {
1442:                return multicastRequestAdapter;
1443:            }
1444:
1445:            //
1446:            // Getter/Setter and tools (equals, ...)
1447:            //
1448:
1449:            /**
1450:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#getNextConnectionId()
1451:             */
1452:            public long getNextConnectionId() {
1453:                long id = super .getNextConnectionId();
1454:                return distributedRequestManager.getNextConnectionId(id);
1455:            }
1456:
1457:            //
1458:            // Getter/Setter and tools (equals, ...)
1459:            //
1460:
1461:            //
1462:            // Getter/Setter and tools (equals, ...)
1463:            //
1464:
1465:            protected int getNumberOfEnabledBackends()
1466:                    throws VirtualDatabaseException {
1467:                // 1/ get number of local active backends
1468:                int nbActive = super .getNumberOfEnabledBackends();
1469:
1470:                // 2/ add remote active backends
1471:
1472:                // TODO: synchronize this access to backendsPerController (and others)
1473:                DatabaseBackend b;
1474:                Iterator iter = backendsPerController.keySet().iterator();
1475:                while (iter.hasNext()) {
1476:                    Member member = (Member) iter.next();
1477:
1478:                    List remoteBackends = (List) backendsPerController
1479:                            .get(member);
1480:                    int size = remoteBackends.size();
1481:                    b = null;
1482:                    for (int i = 0; i < size; i++) {
1483:                        b = (DatabaseBackend) remoteBackends.get(i);
1484:                        if (b.isReadEnabled() || b.isWriteEnabled())
1485:                            // test symetrical to RequestManager.backupBackend()
1486:                            nbActive++;
1487:                    }
1488:                }
1489:                /*
1490:                 * temporary, until backendsPerController is really updated (not yet done),
1491:                 * make as is force=true in backupBackend().
1492:                 */
1493:                nbActive = -1;
1494:                return nbActive;
1495:            }
1496:
1497:            /**
1498:             * Return a ControllerResultSet containing the PreparedStatement metaData of
1499:             * the given sql template
1500:             *
1501:             * @param request the request containing the sql template
1502:             * @return an empty ControllerResultSet with the metadata
1503:             * @throws SQLException if a database error occurs
1504:             */
1505:            public ControllerResultSet getPreparedStatementGetMetaData(
1506:                    AbstractRequest request) throws SQLException {
1507:                try {
1508:                    return requestManager
1509:                            .getPreparedStatementGetMetaData(request);
1510:                } catch (NoMoreBackendException e) {
1511:                    // Try remote controllers
1512:                    try {
1513:                        MulticastResponse rspList = getMulticastRequestAdapter()
1514:                                .multicastMessage(
1515:                                        getAllMemberButUs(),
1516:                                        new GetPreparedStatementMetadata(
1517:                                                request),
1518:                                        MulticastRequestAdapter.WAIT_ALL,
1519:                                        getMessageTimeouts()
1520:                                                .getVirtualDatabaseConfigurationTimeout());
1521:
1522:                        Map results = rspList.getResults();
1523:                        if (results.size() == 0)
1524:                            if (logger.isWarnEnabled())
1525:                                logger
1526:                                        .warn("No response while getting prepared statement metadata from remote controller");
1527:                        for (Iterator iter = results.values().iterator(); iter
1528:                                .hasNext();) {
1529:                            Object response = iter.next();
1530:                            if (response instanceof  ControllerException) {
1531:                                if (logger.isErrorEnabled()) {
1532:                                    logger
1533:                                            .error("Error while getting prepared statement metadata from remote controller");
1534:                                }
1535:                            } else {
1536:                                // Here we succeded in getting prepared statement metadata from a
1537:                                // remote controller
1538:                                return (ControllerResultSet) response;
1539:                            }
1540:                        }
1541:                    } catch (NotConnectedException e2) {
1542:                        if (logger.isErrorEnabled())
1543:                            logger
1544:                                    .error(
1545:                                            "Channel unavailable while getting prepared statement metadata from remote controller",
1546:                                            e2);
1547:                    }
1548:
1549:                    // Here we didn't succeded in getting prepared statement metadata from
1550:                    // another controller
1551:                    throw e;
1552:                }
1553:            }
1554:
1555:            /**
1556:             * Returns the processMacroBeforeBroadcast value.
1557:             *
1558:             * @return Returns the processMacroBeforeBroadcast.
1559:             */
1560:            public boolean isProcessMacroBeforeBroadcast() {
1561:                return processMacroBeforeBroadcast;
1562:            }
1563:
1564:            /**
1565:             * Sets the processMacroBeforeBroadcast value.
1566:             *
1567:             * @param processMacros true if macros must be processed before broadcast.
1568:             */
1569:            public void setProcessMacroBeforeBroadcast(boolean processMacros) {
1570:                this .processMacroBeforeBroadcast = processMacros;
1571:            }
1572:
1573:            /**
1574:             * Returns the request result failover cache associated to this distributed
1575:             * virtual database.
1576:             *
1577:             * @return a <code>RequestResultFailoverCache</code> object.
1578:             */
1579:            public RequestResultFailoverCache getRequestResultFailoverCache() {
1580:                return requestResultFailoverCache;
1581:            }
1582:
1583:            /**
1584:             * Sets a new distributed request manager for this database.
1585:             *
1586:             * @param requestManager the new request manager.
1587:             */
1588:            public void setRequestManager(RequestManager requestManager) {
1589:                if (!(requestManager instanceof  DistributedRequestManager))
1590:                    throw new RuntimeException(
1591:                            "A distributed virtual database can only work with a distributed request manager.");
1592:
1593:                distributedRequestManager = (DistributedRequestManager) requestManager;
1594:                // really, this is super.requestManager
1595:                this .requestManager = distributedRequestManager;
1596:            }
1597:
1598:            /**
1599:             * Get the whole static metadata for this virtual database. A new empty
1600:             * metadata object is created if there was none yet. It will be filled later
1601:             * by gatherStaticMetadata() when the backend is enabled.
1602:             *
1603:             * @return Virtual database static metadata
1604:             */
1605:            public VirtualDatabaseStaticMetaData getStaticMetaData() {
1606:                staticMetadata = doGetStaticMetaData();
1607:
1608:                // If no backends enabled and vdb is distributed try remote controllers
1609:                if ((staticMetadata == null)
1610:                        || (staticMetadata.getMetadataContainer() == null)) {
1611:                    try {
1612:                        MulticastResponse rspList = getMulticastRequestAdapter()
1613:                                .multicastMessage(
1614:                                        getAllMemberButUs(),
1615:                                        new GetStaticMetadata(),
1616:                                        MulticastRequestAdapter.WAIT_ALL,
1617:                                        getMessageTimeouts()
1618:                                                .getVirtualDatabaseConfigurationTimeout());
1619:
1620:                        Map results = rspList.getResults();
1621:                        if (results.size() == 0)
1622:                            if (logger.isWarnEnabled())
1623:                                logger
1624:                                        .warn("No response while getting static metadata from remote controller");
1625:                        for (Iterator iter = results.values().iterator(); iter
1626:                                .hasNext();) {
1627:                            Object response = iter.next();
1628:                            if (response instanceof  ControllerException) {
1629:                                if (logger.isErrorEnabled()) {
1630:                                    logger
1631:                                            .error("Error while getting static metadata from remote controller");
1632:                                }
1633:                            } else {
1634:                                // Here we succeded in getting static metadata from a remote
1635:                                // controller
1636:                                staticMetadata
1637:                                        .setMetadataContainer((MetadataContainer) response);
1638:                            }
1639:                        }
1640:                    } catch (NotConnectedException e2) {
1641:                        if (logger.isErrorEnabled())
1642:                            logger
1643:                                    .error(
1644:                                            "Channel unavailable while getting static metadata from remote controller",
1645:                                            e2);
1646:                    }
1647:                }
1648:
1649:                return staticMetadata;
1650:            }
1651:
1652:            /**
1653:             * Check if the given backend definition is compatible with the backend
1654:             * definitions of this distributed virtual database. Not that if the given
1655:             * backend does not exist in the current configuration, it is considered as
1656:             * compatible. Incompatibility results from 2 backends with the same JDBC URL
1657:             * or same logical name.
1658:             *
1659:             * @param backend the backend to check
1660:             * @return true if the backend is compatible with the local definition
1661:             * @throws VirtualDatabaseException if locking the local backend list fails
1662:             */
1663:            public boolean isCompatibleBackend(BackendInfo backend)
1664:                    throws VirtualDatabaseException {
1665:                try {
1666:                    acquireReadLockBackendLists();
1667:                } catch (InterruptedException e) {
1668:                    String msg = "Unable to acquire read lock on backend list in isCompatibleBackend ("
1669:                            + e + ")";
1670:                    logger.error(msg);
1671:                    throw new VirtualDatabaseException(msg);
1672:                }
1673:
1674:                try {
1675:                    // Find the backend
1676:                    String backendURL = backend.getUrl();
1677:                    String backendName = backend.getName();
1678:                    int size = backends.size();
1679:                    DatabaseBackend b = null;
1680:                    for (int i = 0; i < size; i++) {
1681:                        b = (DatabaseBackend) backends.get(i);
1682:                        if (b.getURL().equals(backendURL)
1683:                                || b.getName().equals(backendName))
1684:                            return false;
1685:                    }
1686:                } catch (RuntimeException re) {
1687:                    throw new VirtualDatabaseException(re);
1688:                } finally {
1689:                    releaseReadLockBackendLists();
1690:                }
1691:                // This backend does not exist here
1692:                return true;
1693:            }
1694:
1695:            /**
1696:             * Return true if the provided schema is compatible with the existing schema
1697:             * of this distributed virtual database. Note that if the given schema is
1698:             * null, this function returns true.
1699:             *
1700:             * @param dbs the database schema to compare with
1701:             * @return true if dbs is compatible with the current schema (according to
1702:             *         RAIDb level)
1703:             */
1704:            public boolean isCompatibleDatabaseSchema(DatabaseSchema dbs) {
1705:                // Database schema checking (if any)
1706:                if (dbs == null) {
1707:                    logger
1708:                            .warn(Translate
1709:                                    .get("virtualdatabase.distributed.configuration.checking.noschema"));
1710:                } else {
1711:                    // Check database schemas compatibility
1712:                    switch (getRequestManager().getLoadBalancer()
1713:                            .getRAIDbLevel()) {
1714:                    case RAIDbLevels.RAIDb0:
1715:                        // There must be no overlap between schemas
1716:                        if (dbs.equals(getRequestManager().getDatabaseSchema())) {
1717:                            logger
1718:                                    .warn(Translate
1719:                                            .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1720:                            return false;
1721:                        }
1722:                        break;
1723:                    case RAIDbLevels.RAIDb1:
1724:                        // Schemas must be identical
1725:                        if (!dbs
1726:                                .equals(getRequestManager().getDatabaseSchema())) {
1727:                            logger
1728:                                    .warn(Translate
1729:                                            .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1730:                            return false;
1731:                        }
1732:                        break;
1733:                    case RAIDbLevels.RAIDb2:
1734:                        // Common parts of the schema must be identical
1735:                        if (!dbs.isCompatibleWith(getRequestManager()
1736:                                .getDatabaseSchema())) {
1737:                            logger
1738:                                    .warn(Translate
1739:                                            .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1740:                            return false;
1741:                        }
1742:                        break;
1743:                    case RAIDbLevels.SingleDB:
1744:                    default:
1745:                        logger.error("Unsupported RAIDb level: "
1746:                                + getRequestManager().getLoadBalancer()
1747:                                        .getRAIDbLevel());
1748:                        return false;
1749:                    }
1750:                }
1751:                return true;
1752:            }
1753:
1754:            /**
1755:             * Is this virtual database distributed ?
1756:             *
1757:             * @return true
1758:             */
1759:            public boolean isDistributed() {
1760:                return true;
1761:            }
1762:
1763:            /**
1764:             * Returns the isVirtualDatabaseStarted value.
1765:             *
1766:             * @return Returns the isVirtualDatabaseStarted.
1767:             */
1768:            public final boolean isVirtualDatabaseStarted() {
1769:                return isVirtualDatabaseStarted;
1770:            }
1771:
1772:            /**
1773:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#getControllers()
1774:             */
1775:            public String[] viewControllerList() {
1776:                if (logger.isInfoEnabled()) {
1777:                    logger.info(channel.getLocalMembership() + " see members:"
1778:                            + currentGroup.getMembers() + " and has mapping:"
1779:                            + controllerJmxAddress);
1780:                }
1781:                Collection controllerJmxNames = controllerJmxAddress.values();
1782:                return (String[]) controllerJmxNames
1783:                        .toArray(new String[controllerJmxNames.size()]);
1784:            }
1785:
1786:            /**
1787:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#viewGroupBackends()
1788:             */
1789:            public Hashtable viewGroupBackends()
1790:                    throws VirtualDatabaseException {
1791:                Hashtable map = super .viewGroupBackends();
1792:                synchronized (backendsPerController) {
1793:                    Iterator iter = backendsPerController.keySet().iterator();
1794:                    while (iter.hasNext()) {
1795:                        Member member = (Member) iter.next();
1796:
1797:                        // Create an List<BackendInfo> from the member backend list
1798:                        List backends = (List) backendsPerController
1799:                                .get(member);
1800:                        List backendInfos = DatabaseBackend
1801:                                .toBackendInfos(backends);
1802:                        map.put(controllerJmxAddress.get(member), backendInfos);
1803:                    }
1804:                }
1805:                return map;
1806:            }
1807:
1808:            //
1809:            // Distributed virtual database management functions
1810:            //
1811:
1812:            /**
1813:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#addBackend(org.continuent.sequoia.controller.backend.DatabaseBackend)
1814:             */
1815:            public void addBackend(DatabaseBackend db)
1816:                    throws VirtualDatabaseException {
1817:                // Add the backend to the virtual database.
1818:                super .addBackend(db);
1819:
1820:                // Send a group message if already joined group
1821:                try {
1822:                    broadcastBackendInformation(getAllMemberButUs());
1823:                } catch (Exception e) {
1824:                    String msg = "Error while broadcasting backend information when adding backend";
1825:                    logger.error(msg, e);
1826:                    throw new VirtualDatabaseException(msg, e);
1827:                }
1828:            }
1829:
1830:            /**
1831:             * Add a controller id to the controllerIds list.
1832:             *
1833:             * @param remoteControllerMembership the membership identifying the remote
1834:             *          controller
1835:             * @param remoteControllerId remote controller identifier
1836:             */
1837:            public void addRemoteControllerId(
1838:                    Member remoteControllerMembership, long remoteControllerId) {
1839:                controllerIds.put(remoteControllerMembership, new Long(
1840:                        remoteControllerId));
1841:
1842:                if (logger.isDebugEnabled())
1843:                    logger.debug("Adding new controller id:"
1844:                            + remoteControllerId + " for member "
1845:                            + remoteControllerMembership);
1846:            }
1847:
1848:            /**
1849:             * Add a list of remote backends to the backendsPerController map.
1850:             *
1851:             * @param sender the membership identifying the remote controller
1852:             * @param remoteBackends remote controller backends
1853:             */
1854:            public void addBackendPerController(Member sender,
1855:                    List remoteBackends) {
1856:                backendsPerController.put(sender, remoteBackends);
1857:
1858:                if (logger.isInfoEnabled())
1859:                    logger
1860:                            .info(Translate
1861:                                    .get(
1862:                                            "virtualdatabase.distributed.configuration.updating.backend.list",
1863:                                            sender));
1864:            }
1865:
1866:            /**
1867:             * Returns the local view of the backends in this virtual database across all
1868:             * <em>remote</em> controllers.
1869:             *
1870:             * @return a Hashtable&lt;Member, List&lt;DatabaseBackend&gt;&gt;
1871:             */
1872:            public Hashtable getBackendsPerController() {
1873:                return backendsPerController;
1874:            }
1875:
1876:            /**
1877:             * Add a new controller name to the controllerJmxAddress list and refresh the
1878:             * group membership.
1879:             *
1880:             * @param remoteControllerMembership the membership identifying the remote
1881:             *          controller
1882:             * @param remoteControllerJmxName the JMX name of the remote controller
1883:             */
1884:            public void addRemoteControllerJmxName(
1885:                    Member remoteControllerMembership,
1886:                    String remoteControllerJmxName) {
1887:                controllerJmxAddress.put(remoteControllerMembership,
1888:                        remoteControllerJmxName);
1889:                if (logger.isDebugEnabled())
1890:                    logger.debug("Adding new controller "
1891:                            + remoteControllerJmxName + " for member "
1892:                            + remoteControllerMembership);
1893:
1894:                sendJmxNotification(
1895:                        SequoiaNotificationList.DISTRIBUTED_CONTROLLER_ADDED,
1896:                        Translate.get(
1897:                                "notification.distributed.controller.added",
1898:                                new String[] { remoteControllerJmxName, name }));
1899:
1900:                refreshGroupMembership();
1901:            }
1902:
1903:            /**
1904:             * Broadcast backend information among controllers.
1905:             *
1906:             * @param dest List of <code>Address</code> to send the message to
1907:             * @throws NotConnectedException if the channel is not connected
1908:             */
1909:            private void broadcastBackendInformation(ArrayList dest)
1910:                    throws NotConnectedException {
1911:                logger
1912:                        .debug(Translate
1913:                                .get("virtualdatabase.distributed.configuration.querying.remote.status"));
1914:
1915:                // Send our backend status using serializable BackendInfo
1916:                List backendInfos = DatabaseBackend.toBackendInfos(backends);
1917:                MulticastResponse rspList = multicastRequestAdapter
1918:                        .multicastMessage(dest, new BackendStatus(backendInfos,
1919:                                distributedRequestManager.getControllerId()),
1920:                                MulticastRequestAdapter.WAIT_ALL,
1921:                                messageTimeouts.getBackendStatusTimeout());
1922:
1923:                int size = dest.size();
1924:                for (int i = 0; i < size; i++) {
1925:                    // Add the backend configuration of every remote controller
1926:                    Member m = (Member) dest.get(i);
1927:                    if (rspList.getResult(m) != null) {
1928:                        BackendStatus bs = (BackendStatus) rspList.getResult(m);
1929:                        // Update backend list from sender
1930:                        List remoteBackendInfos = bs.getBackendInfos();
1931:                        // convert the BackendInfos to DatabaseBackends
1932:                        List remoteBackends = BackendInfo
1933:                                .toDatabaseBackends(remoteBackendInfos);
1934:                        backendsPerController.put(m, remoteBackends);
1935:                        if (logger.isDebugEnabled())
1936:                            logger
1937:                                    .debug(Translate
1938:                                            .get(
1939:                                                    "virtualdatabase.distributed.configuration.updating.backend.list",
1940:                                                    m.toString()));
1941:                    } else
1942:                        logger
1943:                                .warn(Translate
1944:                                        .get(
1945:                                                "virtualdatabase.distributed.unable.get.remote.status",
1946:                                                m.toString()));
1947:                }
1948:            }
1949:
1950:            /**
1951:             * Send the configuration of this controller to remote controller. All remote
1952:             * controllers must agree on the compatibility of the local controller
1953:             * configuration with their own configuration. Compatibility checking include
1954:             * Authentication Manager, Scheduler and Load Balancer settings.
1955:             *
1956:             * @param dest List of <code>Address</code> to send the message to
1957:             * @return INCOMPATIBLE_CONFIGURATION if the configuration is not compatible
1958:             *         with other controllers or the controller id to use otherwise.
1959:             */
1960:            private long checkConfigurationCompatibilityAndReturnControllerId(
1961:                    ArrayList dest) {
1962:                if (logger.isInfoEnabled())
1963:                    logger
1964:                            .info(Translate
1965:                                    .get("virtualdatabase.distributed.configuration.checking"));
1966:
1967:                // Send our configuration
1968:                MulticastResponse rspList;
1969:                try {
1970:                    rspList = multicastRequestAdapter.multicastMessage(dest,
1971:                            new VirtualDatabaseConfiguration(this ),
1972:                            MulticastRequestAdapter.WAIT_ALL, messageTimeouts
1973:                                    .getVirtualDatabaseConfigurationTimeout());
1974:                } catch (NotConnectedException e) {
1975:                    logger
1976:                            .error(
1977:                                    "Channel unavailable while checking configuration compatibility",
1978:                                    e);
1979:                    return INCOMPATIBLE_CONFIGURATION;
1980:                }
1981:
1982:                // Check that everybody agreed
1983:                Map results = rspList.getResults();
1984:                int size = results.size();
1985:                if (size == 0)
1986:                    logger
1987:                            .warn(Translate
1988:                                    .get("virtualdatabase.distributed.configuration.checking.noanswer"));
1989:
1990:                long highestRemoteControllerId = 0;
1991:                for (Iterator iter = results.values().iterator(); iter
1992:                        .hasNext();) {
1993:                    Object response = iter.next();
1994:                    if (response instanceof  VirtualDatabaseConfigurationResponse) {
1995:                        // These highestRemotecontrollerId and remoteControllerId are returned
1996:                        // directly by the remote controller, and are 'thus' of 'shifted
1997:                        // nature': effective bits = upper 16 bits. See
1998:                        // DistributedRequestManager.CONTROLLER_ID_BITS
1999:                        VirtualDatabaseConfigurationResponse vdbcr = (VirtualDatabaseConfigurationResponse) response;
2000:                        long remoteControllerId = vdbcr.getControllerId();
2001:                        if (remoteControllerId == INCOMPATIBLE_CONFIGURATION) {
2002:                            return INCOMPATIBLE_CONFIGURATION;
2003:                        }
2004:                        // Check if there still is a problem of missing vdb users.
2005:                        // If it is the case try to add them dynamically.
2006:                        if (logger.isWarnEnabled()) {
2007:                            logger
2008:                                    .warn("Some virtual database users are missing from this configuration, trying to create them transparently...");
2009:                        }
2010:                        if (vdbcr.getAdditionalVdbUsers() != null) {
2011:                            for (Iterator iter2 = vdbcr.getAdditionalVdbUsers()
2012:                                    .iterator(); iter2.hasNext();) {
2013:                                VirtualDatabaseUser vdbUser = (VirtualDatabaseUser) iter2
2014:                                        .next();
2015:
2016:                                // Using the "super" trick here probably means bad design.
2017:                                // The intent is to create the vdb users just locally, hence we use
2018:                                // the method in
2019:                                // VirtualDatabase rather than the overridden method in
2020:                                // DistributedVirtual Database.
2021:                                super .checkAndAddVirtualDatabaseUser(vdbUser);
2022:
2023:                                if (!getAuthenticationManager()
2024:                                        .isValidVirtualUser(vdbUser)) {
2025:                                    return INCOMPATIBLE_CONFIGURATION;
2026:                                }
2027:                            }
2028:                        }
2029:
2030:                        if (highestRemoteControllerId < remoteControllerId)
2031:                            highestRemoteControllerId = remoteControllerId;
2032:                    } else {
2033:                        logger
2034:                                .error("Unexpected response while checking configuration compatibility: "
2035:                                        + response);
2036:                        return INCOMPATIBLE_CONFIGURATION;
2037:                    }
2038:                }
2039:
2040:                // Ok, everybody agreed that our configuration is compatible.
2041:                // Take the highest controller id + 1 as our id. (non-shifted, this is used
2042:                // to pass in setControllerId which expects 16 bits)
2043:                return ((highestRemoteControllerId >> DistributedRequestManager.CONTROLLER_ID_SHIFT_BITS) & DistributedRequestManager.CONTROLLER_ID_BITS) + 1;
2044:            }
2045:
2046:            /**
2047:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#checkAndAddVirtualDatabaseUser(org.continuent.sequoia.common.users.VirtualDatabaseUser)
2048:             */
2049:            public void checkAndAddVirtualDatabaseUser(
2050:                    VirtualDatabaseUser vdbUser) {
2051:                // Is vdb user valid?
2052:                MulticastResponse rspList;
2053:                try {
2054:                    rspList = multicastRequestAdapter.multicastMessage(
2055:                            getAllMembers(), new IsValidUserForAllBackends(
2056:                                    vdbUser), MulticastRequestAdapter.WAIT_ALL,
2057:                            messageTimeouts
2058:                                    .getVirtualDatabaseConfigurationTimeout());
2059:                } catch (NotConnectedException e) {
2060:                    logger.error(
2061:                            "Channel unavailable while checking validity of vdb user "
2062:                                    + vdbUser.getLogin(), e);
2063:                    return;
2064:                }
2065:
2066:                // Check that everybody agreed
2067:                Map results = rspList.getResults();
2068:                int size = results.size();
2069:                if (size == 0)
2070:                    logger
2071:                            .warn("No response while checking validity of vdb user "
2072:                                    + vdbUser.getLogin());
2073:                for (Iterator iter = results.values().iterator(); iter
2074:                        .hasNext();) {
2075:                    Object response = iter.next();
2076:                    if (response instanceof  Boolean) {
2077:                        if (!((Boolean) response).booleanValue()) {
2078:                            if (logger.isWarnEnabled()) {
2079:                                logger
2080:                                        .warn("Could not create new vdb user "
2081:                                                + vdbUser.getLogin()
2082:                                                + " because it does not exist on all backends");
2083:                            }
2084:                            return;
2085:                        }
2086:                    } else {
2087:                        logger
2088:                                .error("Unexpected response while checking validity of vdb user "
2089:                                        + vdbUser.getLogin() + " : " + response);
2090:                        return;
2091:                    }
2092:                }
2093:
2094:                // Add user
2095:                try {
2096:                    rspList = multicastRequestAdapter.multicastMessage(
2097:                            getAllMembers(),
2098:                            new AddVirtualDatabaseUser(vdbUser),
2099:                            MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2100:                                    .getVirtualDatabaseConfigurationTimeout());
2101:                } catch (NotConnectedException e) {
2102:                    logger
2103:                            .error("Channel unavailable while adding vdb user "
2104:                                    + vdbUser.getLogin()
2105:                                    + ", trying to clean-up...", e);
2106:                    removeVirtualDatabaseUser(vdbUser);
2107:                }
2108:
2109:                // Check for exceptions
2110:                results = rspList.getResults();
2111:                size = results.size();
2112:                if (size == 0)
2113:                    logger.warn("No response while adding vdb user "
2114:                            + vdbUser.getLogin());
2115:                for (Iterator iter = results.values().iterator(); iter
2116:                        .hasNext();) {
2117:                    Object response = iter.next();
2118:                    if (response instanceof  ControllerException) {
2119:                        if (logger.isErrorEnabled()) {
2120:                            logger.error("Error while adding vdb user "
2121:                                    + vdbUser.getLogin()
2122:                                    + ", trying to clean-up...");
2123:                        }
2124:                        removeVirtualDatabaseUser(vdbUser);
2125:                        return;
2126:                    }
2127:                }
2128:            }
2129:
2130:            /**
2131:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#closePersistentConnection(java.lang.String,
2132:             *      long)
2133:             */
2134:            public void closePersistentConnection(String login,
2135:                    long persistentConnectionId) {
2136:                distributedRequestManager.distributedClosePersistentConnection(
2137:                        login, persistentConnectionId);
2138:            }
2139:
2140:            /**
2141:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#openPersistentConnection(java.lang.String,
2142:             *      long)
2143:             */
2144:            public void openPersistentConnection(String login,
2145:                    long persistentConnectionId) throws SQLException {
2146:                distributedRequestManager.distributedOpenPersistentConnection(
2147:                        login, persistentConnectionId);
2148:            }
2149:
2150:            /**
2151:             * What this method does is really initiating the copy. It is the remote
2152:             * controller's vdb that performs the actual copy, fetching the dump from this
2153:             * vdb's local backuper.
2154:             *
2155:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#copyDump(java.lang.String,
2156:             *      java.lang.String)
2157:             * @param dumpName the name of the dump to copy. Should exist locally, and not
2158:             *          remotely.
2159:             * @param remoteControllerName the remote controller to talk to.
2160:             * @throws VirtualDatabaseException in case of error.
2161:             */
2162:            public void copyDump(String dumpName, String remoteControllerName)
2163:                    throws VirtualDatabaseException {
2164:                transferDump(dumpName, remoteControllerName, false);
2165:            }
2166:
2167:            /**
2168:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#copyLogFromCheckpoint(java.lang.String,
2169:             *      java.lang.String)
2170:             */
2171:            public void copyLogFromCheckpoint(String dumpName,
2172:                    String controllerName) throws VirtualDatabaseException {
2173:                // perform basic error checks (in particular, on success, we have a
2174:                // recovery log)
2175:                super .copyLogFromCheckpoint(dumpName, controllerName);
2176:
2177:                Member controllerByName = getControllerByName(controllerName);
2178:                if (isLocalSender(controllerByName))
2179:                    throw new VirtualDatabaseException(
2180:                            "A restore log command must be applied to a remote controller");
2181:
2182:                // get the checkpoint name from the dump info, or die
2183:                String dumpCheckpointName;
2184:                DumpInfo dumpInfo;
2185:
2186:                try {
2187:                    dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
2188:                } catch (SQLException e) {
2189:                    throw new VirtualDatabaseException(
2190:                            "Recovery log error access occured while checking for dump"
2191:                                    + dumpName, e);
2192:                }
2193:
2194:                if (dumpInfo == null)
2195:                    throw new VirtualDatabaseException(
2196:                            "No information was found in the dump table for dump "
2197:                                    + dumpName);
2198:
2199:                RestoreLogOperation restoreLogOperation = new RestoreLogOperation(
2200:                        dumpName, controllerName);
2201:                addAdminOperation(restoreLogOperation);
2202:                try {
2203:                    dumpCheckpointName = dumpInfo.getCheckpointName();
2204:
2205:                    // set a global 'now' checkpoint (temporary) and suspend all activities
2206:                    String nowCheckpointName = setLogReplicationCheckpoint(controllerName);
2207:
2208:                    // AT THIS POINT, ALL ACTIVITIES ARE SUSPENDED
2209:
2210:                    // get its id (ewerk) so that we can replicate it on the other side
2211:                    long nowCheckpointId;
2212:                    RecoveryLog recoveryLog = getRequestManager()
2213:                            .getRecoveryLog();
2214:                    try {
2215:                        try {
2216:                            nowCheckpointId = recoveryLog
2217:                                    .getCheckpointLogId(nowCheckpointName);
2218:                        } catch (SQLException e) {
2219:                            String errorMessage = "Cannot find 'now checkpoint' log entry";
2220:                            logger.error(errorMessage);
2221:                            throw new VirtualDatabaseException(errorMessage);
2222:                        }
2223:
2224:                        // initiate the replication - clears the remote recovery log.
2225:                        sendMessageToController(controllerByName,
2226:                                new ReplicateLogEntries(nowCheckpointName,
2227:                                        null, dumpName, nowCheckpointId),
2228:                                messageTimeouts.getReplicateLogEntriesTimeout());
2229:                    } finally {
2230:                        getRequestManager().resumeActivity();
2231:                    }
2232:
2233:                    // SCHEDULER ACTIVITIES ARE RESUMES AT THIS POINT
2234:
2235:                    // protect from concurrent log updates: fake a recovery (increments
2236:                    // semaphore)
2237:                    recoveryLog.beginRecovery();
2238:
2239:                    // copy the entries over to the remote controller.
2240:                    // Send them one by one over to the remote controller, coz each LogEntry
2241:                    // can
2242:                    // potentially be huge (e.g. if it contains a blob)
2243:                    try {
2244:                        ArrayList dest = new ArrayList();
2245:                        dest.add(controllerByName);
2246:                        long copyLogEntryTimeout = getMessageTimeouts()
2247:                                .getCopyLogEntryTimeout();
2248:                        long dumpId = recoveryLog
2249:                                .getCheckpointLogId(dumpCheckpointName);
2250:
2251:                        if (logger.isDebugEnabled()) {
2252:                            logger.debug("Resynchronizing from checkpoint "
2253:                                    + dumpCheckpointName + " (" + dumpId
2254:                                    + ") to checkpoint " + nowCheckpointName
2255:                                    + " (" + nowCheckpointId + ")");
2256:                        }
2257:
2258:                        for (long id = dumpId; id < nowCheckpointId; id++) {
2259:                            LogEntry entry = recoveryLog.getNextLogEntry(id);
2260:                            if (entry == null) {
2261:                                // No more entries available, stop here
2262:                                break;
2263:                            }
2264:
2265:                            // Because 'getNextLogEntry()' will hunt for the next valid log entry,
2266:                            // we need to update the iterator with the new id value - 1
2267:                            id = entry.getLogId() - 1;
2268:
2269:                            MulticastResponse resp = getMulticastRequestAdapter()
2270:                                    .multicastMessage(dest,
2271:                                            new CopyLogEntry(entry),
2272:                                            MulticastRequestAdapter.WAIT_NONE,
2273:                                            copyLogEntryTimeout);
2274:                            if (resp.getFailedMembers() != null)
2275:                                throw new IOException(
2276:                                        "Failed to deliver log entry " + id
2277:                                                + " to remote controller "
2278:                                                + controllerName);
2279:                        }
2280:
2281:                        // Now check that no entry was missed by the other controller since we
2282:                        // shipped all entries asynchronously without getting any individual ack
2283:                        // (much faster to address SEQUOIA-504)
2284:                        long localNbOfLogEntries = recoveryLog
2285:                                .getNumberOfLogEntries(dumpId, nowCheckpointId);
2286:
2287:                        if (logger.isDebugEnabled()) {
2288:                            logger
2289:                                    .debug("Checking that "
2290:                                            + localNbOfLogEntries
2291:                                            + " entries were resynchronized in remote log");
2292:                        }
2293:
2294:                        Serializable replyValue = sendMessageToController(
2295:                                controllerByName,
2296:                                new CompleteRecoveryLogResync(dumpId,
2297:                                        nowCheckpointName, localNbOfLogEntries),
2298:                                getMessageTimeouts()
2299:                                        .getReplicateLogEntriesTimeout());
2300:                        if (replyValue instanceof  Long) {
2301:                            long diff = ((Long) replyValue).longValue();
2302:                            if (diff != 0)
2303:                                throw new VirtualDatabaseException(
2304:                                        "Recovery log resynchronization reports a difference of "
2305:                                                + diff + " entries");
2306:                        } else
2307:                            throw new RuntimeException(
2308:                                    "Invalid answer from remote controller on CompleteRecoveryLogResync ("
2309:                                            + replyValue + ")");
2310:
2311:                        // terminate the replication - sets the remote dump checkpoint name.
2312:                        sendMessageToController(controllerName,
2313:                                new ReplicateLogEntries(null,
2314:                                        dumpCheckpointName, dumpName, dumpId),
2315:                                messageTimeouts.getReplicateLogEntriesTimeout());
2316:                    } catch (Exception e) {
2317:                        String errorMessage = "Failed to send log entries";
2318:                        logger.error(errorMessage, e);
2319:                        throw new VirtualDatabaseException(errorMessage);
2320:                    } finally {
2321:                        recoveryLog.endRecovery(); // release semaphore
2322:                    }
2323:                } finally {
2324:                    removeAdminOperation(restoreLogOperation);
2325:                }
2326:            }
2327:
2328:            /**
2329:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForPersistentConnection(long)
2330:             */
2331:            public void failoverForPersistentConnection(
2332:                    long persistentConnectionId) {
2333:                distributedRequestManager
2334:                        .distributedFailoverForPersistentConnection(persistentConnectionId);
2335:            }
2336:
2337:            /**
2338:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForTransaction(long)
2339:             */
2340:            public void failoverForTransaction(long currentTid) {
2341:                distributedRequestManager
2342:                        .distributedFailoverForTransaction(currentTid);
2343:            }
2344:
2345:            /**
2346:             * Returns the recovery log associated with this controller.
2347:             *
2348:             * @return the recovery log associated with this controller.
2349:             * @throws VirtualDatabaseException if the database has not recovery log
2350:             */
2351:            public RecoveryLog getRecoveryLog() throws VirtualDatabaseException {
2352:                if (!hasRecoveryLog())
2353:                    throw new VirtualDatabaseException(Translate
2354:                            .get("virtualdatabase.no.recovery.log"));
2355:
2356:                return getRequestManager().getRecoveryLog();
2357:            }
2358:
2359:            /**
2360:             * Update remote backends list after a backend disable notification has been
2361:             * received.
2362:             *
2363:             * @param disabledBackend backend that is disabled
2364:             * @param sender the message sender
2365:             */
2366:            public void handleRemoteDisableBackendNotification(
2367:                    DatabaseBackend disabledBackend, Member sender) {
2368:                synchronized (backendsPerController) {
2369:                    List remoteBackends = (List) backendsPerController
2370:                            .get(sender);
2371:                    if (remoteBackends == null) { // This case was reported by Alessandro Gamboz on April 1, 2005.
2372:                        // It looks like the EnableBackend message arrives before membership
2373:                        // has been properly updated.
2374:                        logger
2375:                                .warn("No information has been found for remote controller "
2376:                                        + sender);
2377:                        remoteBackends = new ArrayList();
2378:                        backendsPerController.put(sender, remoteBackends);
2379:                    }
2380:                    int size = remoteBackends.size();
2381:                    boolean backendFound = false;
2382:                    for (int i = 0; i < size; i++) {
2383:                        DatabaseBackend remoteBackend = (DatabaseBackend) remoteBackends
2384:                                .get(i);
2385:                        if (remoteBackend.equals(disabledBackend)) {
2386:                            logger.info("Backend " + remoteBackend.getName()
2387:                                    + " disabled on controller " + sender);
2388:                            remoteBackends.set(i, disabledBackend);
2389:                            backendFound = true;
2390:                            break;
2391:                        }
2392:                    }
2393:                    if (!backendFound) {
2394:                        logger
2395:                                .warn("Updating backend list with unknown backend "
2396:                                        + disabledBackend.getName()
2397:                                        + " disabled on controller " + sender);
2398:                        remoteBackends.add(disabledBackend);
2399:                    }
2400:                }
2401:            }
2402:
2403:            /**
2404:             * Update remote backends list after a backend disable notification has been
2405:             * received.
2406:             *
2407:             * @param disabledBackendInfos List of BackendInfo objects that are disabled
2408:             * @param sender the message sender
2409:             */
2410:            public void handleRemoteDisableBackendsNotification(
2411:                    ArrayList disabledBackendInfos, Member sender) {
2412:                synchronized (backendsPerController) {
2413:                    List remoteBackends = (List) backendsPerController
2414:                            .get(sender);
2415:                    if (remoteBackends == null) { // This case was reported by Alessandro Gamboz on April 1, 2005.
2416:                        // It looks like the EnableBackend message arrives before membership
2417:                        // has been properly updated.
2418:                        logger
2419:                                .warn("No information has been found for remote controller "
2420:                                        + sender);
2421:                        remoteBackends = new ArrayList();
2422:                        backendsPerController.put(sender, remoteBackends);
2423:                    }
2424:                    Iterator iter = disabledBackendInfos.iterator();
2425:                    while (iter.hasNext()) {
2426:                        BackendInfo backendInfo = (BackendInfo) iter.next();
2427:                        DatabaseBackend backend = backendInfo
2428:                                .getDatabaseBackend();
2429:
2430:                        if (remoteBackends.contains(backend)) {
2431:                            logger.info("Backend " + backend.getName()
2432:                                    + " disabled on controller " + sender);
2433:                            remoteBackends.set(remoteBackends.indexOf(backend),
2434:                                    backend);
2435:                        } else {
2436:                            remoteBackends.add(backend);
2437:                            logger
2438:                                    .warn("Updating backend list with unknown backend "
2439:                                            + backendInfo.getName()
2440:                                            + " disabled on controller "
2441:                                            + sender);
2442:                        }
2443:                    }
2444:                }
2445:            }
2446:
2447:            /**
2448:             * Sent the local controller configuration to a remote controller
2449:             *
2450:             * @param dest the membership of the controller so send the information to
2451:             * @throws NotConnectedException if the group communication channel is not
2452:             *           conected
2453:             */
2454:            public void sendLocalConfiguration(Member dest)
2455:                    throws NotConnectedException {
2456:                // Send controller name to new comer
2457:                if (logger.isDebugEnabled())
2458:                    logger
2459:                            .debug("Sending local controller name to joining controller ("
2460:                                    + dest + ")");
2461:
2462:                List target = new ArrayList();
2463:                target.add(dest);
2464:                multicastRequestAdapter.multicastMessage(target,
2465:                        new ControllerInformation(controller
2466:                                .getControllerName(), controller.getJmxName(),
2467:                                distributedRequestManager.getControllerId()),
2468:                        MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2469:                                .getControllerNameTimeout());
2470:
2471:                // Send backend status
2472:                if (logger.isDebugEnabled()) {
2473:                    logger
2474:                            .debug("Sending backend status name to joining controller ("
2475:                                    + dest + ")");
2476:                }
2477:                List backendInfos = DatabaseBackend.toBackendInfos(backends);
2478:                multicastRequestAdapter.multicastMessage(target,
2479:                        new BackendStatus(backendInfos,
2480:                                distributedRequestManager.getControllerId()),
2481:                        MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2482:                                .getBackendStatusTimeout());
2483:            }
2484:
2485:            /**
2486:             * Returns true if the corresponding controller is alive
2487:             *
2488:             * @param controllerId Id of the controller to check
2489:             * @return true if controller is in controllerIds list
2490:             */
2491:            public boolean isAliveController(Long controllerId) {
2492:                return controllerIds.containsValue(controllerId);
2493:            }
2494:
2495:            /**
2496:             * Returns true if the given member is ourselves.
2497:             *
2498:             * @param sender the sender
2499:             * @return true if we are the sender, false otherwise
2500:             */
2501:            public boolean isLocalSender(Member sender) {
2502:                return channel.getLocalMembership().equals(sender);
2503:            }
2504:
2505:            /**
2506:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#removeBackend(java.lang.String)
2507:             */
2508:            public void removeBackend(String backend)
2509:                    throws VirtualDatabaseException {
2510:                super .removeBackend(backend);
2511:
2512:                try {
2513:                    // Send a group message to update backend list
2514:                    broadcastBackendInformation(getAllMemberButUs());
2515:                } catch (Exception e) {
2516:                    String msg = "An error occured while multicasting new backedn information";
2517:                    logger.error(msg, e);
2518:                    throw new VirtualDatabaseException(msg, e);
2519:                }
2520:            }
2521:
2522:            /**
2523:             * Remove a remote controller (usually because it has failed) from the
2524:             * controllerMap list and refresh the group membership. This also start a
2525:             * ControllerFailureCleanupThread.
2526:             *
2527:             * @param remoteControllerMembership the membership identifying the remote
2528:             *          controller
2529:             * @return the JMX name of the removed controller (or null if this controller
2530:             *         was not in the list)
2531:             */
2532:            private String removeRemoteControllerAndStartCleanupThread(
2533:                    Member remoteControllerMembership) {
2534:                String remoteControllerJmxName = (String) controllerJmxAddress
2535:                        .remove(remoteControllerMembership);
2536:                if (logger.isDebugEnabled())
2537:                    logger.debug("Removing controller "
2538:                            + remoteControllerJmxName);
2539:
2540:                // Remove the list of remote backends since they are no more reachable
2541:                backendsPerController.remove(remoteControllerMembership);
2542:                refreshGroupMembership();
2543:
2544:                // Resume ongoing activity suspensions originated from the failed controller
2545:                resumeOngoingActivitySuspensions(remoteControllerMembership);
2546:
2547:                // Retrieve id of controller that failed and start a cleanup thread to
2548:                // eliminate any remaining transactions/remaining persistent connections if
2549:                // no client failover occurs in the defined timeframe
2550:                Long failedControllerId = (Long) controllerIds
2551:                        .remove(remoteControllerMembership);
2552:
2553:                if (failedControllerId != null) {
2554:                    ControllerFailureCleanupThread controllerFailureCleanupThread = new ControllerFailureCleanupThread(
2555:                            this , failedControllerId.longValue(),
2556:                            failoverTimeoutInMs, cleanupThreads,
2557:                            groupCommunicationMessagesLocallyFlushed);
2558:                    cleanupThreads.put(failedControllerId,
2559:                            controllerFailureCleanupThread);
2560:                    controllerFailureCleanupThread.start();
2561:                }
2562:
2563:                return remoteControllerJmxName;
2564:            }
2565:
2566:            /**
2567:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#removeVirtualDatabaseUser(org.continuent.sequoia.common.users.VirtualDatabaseUser)
2568:             */
2569:            private void removeVirtualDatabaseUser(VirtualDatabaseUser vdbUser) {
2570:                try {
2571:                    multicastRequestAdapter.multicastMessage(getAllMembers(),
2572:                            new RemoveVirtualDatabaseUser(vdbUser),
2573:                            MulticastRequestAdapter.WAIT_NONE, messageTimeouts
2574:                                    .getVirtualDatabaseConfigurationTimeout());
2575:                } catch (NotConnectedException e) {
2576:                    logger.error("Channel unavailable while removing vdb user "
2577:                            + vdbUser.getLogin(), e);
2578:                    return;
2579:                }
2580:            }
2581:
2582:            /**
2583:             * Set a cluster-wide checkpoint relying on the implementation of
2584:             * SetCheckpoint to atomically set the checkpoint on all controllers. This
2585:             * method leaves writes, transactions, and persistent connections suspended.
2586:             * The caller must call RequestManager.resumeActivity() when the processing
2587:             * associated with this checkpoint is complete.
2588:             *
2589:             * @param checkpointName the name of the (transfer) checkpoint to create
2590:             * @param groupMembers an ArrayList of target Members
2591:             * @throws VirtualDatabaseException in case of scheduler or recoveryLog
2592:             *           exceptions
2593:             * @see SetCheckpointAndResumeTransactions
2594:             */
2595:            public void setGroupCheckpoint(String checkpointName,
2596:                    ArrayList groupMembers) throws VirtualDatabaseException {
2597:                try {
2598:                    // First suspend transactions
2599:                    distributedRequestManager.suspendActivity();
2600:                    getMulticastRequestAdapter().multicastMessage(
2601:                            groupMembers,
2602:                            new DisableBackendsAndSetCheckpoint(
2603:                                    new ArrayList(), checkpointName),
2604:                            MulticastRequestAdapter.WAIT_ALL,
2605:                            messageTimeouts.getSetCheckpointTimeout());
2606:                } catch (Exception e) {
2607:                    String msg = "Set group checkpoint failed: checkpointName="
2608:                            + checkpointName;
2609:                    logger.error(msg, e);
2610:                    throw new VirtualDatabaseException(msg);
2611:                }
2612:            }
2613:
2614:            /**
2615:             * Sets an atomic (group-wide) checkpoint on local & target controllers
2616:             * (referenced by Name).
2617:             *
2618:             * @param controllerName the target remote controller
2619:             * @param suspendActivity Indicates whether or not system activity should
2620:             *          remain suspended when this request completes.
2621:             * @return the 'now' checkpoint name.
2622:             * @throws VirtualDatabaseException in case of error (whatever error, wraps
2623:             *           the underlying error)
2624:             */
2625:            private String setLogReplicationCheckpoint(String controllerName)
2626:                    throws VirtualDatabaseException {
2627:                return setLogReplicationCheckpoint(getControllerByName(controllerName));
2628:            }
2629:
2630:            /**
2631:             * Sets an atomic (group-wide) checkpoint on local & target controllers
2632:             * (refrenced by Name). When this call completes sucessfully, all activity on
2633:             * the system is suspend. RequestManager.resumeActivity() must be called to
2634:             * resume processing.
2635:             *
2636:             * @param controller the target remote controller
2637:             * @return the 'now' checkpoint name.
2638:             * @throws VirtualDatabaseException in case of error (whatever error, wraps
2639:             *           the underlying error)
2640:             */
2641:            public String setLogReplicationCheckpoint(Member controller)
2642:                    throws VirtualDatabaseException {
2643:                String checkpointName = buildCheckpointName("now");
2644:
2645:                // Apply checkpoint to remote controllers
2646:                ArrayList dest = new ArrayList();
2647:                dest.add(controller);
2648:                dest.add(channel.getLocalMembership());
2649:                setGroupCheckpoint(checkpointName, dest);
2650:                return checkpointName;
2651:            }
2652:
2653:            /**
2654:             * Sets an atomic (group-wide) checkpoint on all controllers, indicating that
2655:             * this vdb has shutdown.
2656:             */
2657:            public void setShutdownCheckpoint() {
2658:                // Set a cluster-wide checkpoint
2659:                try {
2660:                    setGroupCheckpoint(buildCheckpointName("shutdown"),
2661:                            getAllMembers());
2662:                } catch (VirtualDatabaseException e) {
2663:                    logger.warn("Error while setting shutdown checkpoint", e);
2664:                } finally {
2665:                    if (isShuttingDown())
2666:                        setRejectingNewTransaction(true);
2667:                    distributedRequestManager.resumeActivity();
2668:                }
2669:            }
2670:
2671:            /**
2672:             * Send a Message to a remote controller, referenced by name. This sends a
2673:             * point-to-point message, fifo. No total order is specifically required.
2674:             *
2675:             * @param controllerName name of the remote controller
2676:             * @param message the message to send (should be Serializable)
2677:             * @param timeout message timeout in ms
2678:             * @throws VirtualDatabaseException (wrapping error) in case of communication
2679:             *           failure
2680:             */
2681:            private void sendMessageToController(String controllerName,
2682:                    Serializable message, long timeout)
2683:                    throws VirtualDatabaseException {
2684:                sendMessageToController(getControllerByName(controllerName),
2685:                        message, timeout);
2686:            }
2687:
2688:            /**
2689:             * Send a Message to a remote controller, referenced by its Member. This sends
2690:             * a point-to-point message, fifo. No total order is specifically required
2691:             * (but enforced anyway).
2692:             *
2693:             * @param controllerMember Member object refering to the remote controller
2694:             * @param message the message to send (should be Serializable)
2695:             * @param timeout message timeout in ms
2696:             * @return the result returned by the remote controller (except if this an
2697:             *         exception in which case it is automatically thrown)
2698:             * @throws VirtualDatabaseException (wrapping error) in case of communication
2699:             *           failure
2700:             */
2701:            public Serializable sendMessageToController(
2702:                    Member controllerMember, Serializable message, long timeout)
2703:                    throws VirtualDatabaseException {
2704:                try {
2705:                    ArrayList dest = new ArrayList();
2706:                    dest.add(controllerMember);
2707:                    MulticastResponse resp = getMulticastRequestAdapter()
2708:                            .multicastMessage(dest, message,
2709:                                    MulticastRequestAdapter.WAIT_ALL, timeout);
2710:                    Object o = resp.getResult(controllerMember);
2711:                    if (o instanceof  Exception)
2712:                        throw (Exception) o;
2713:                    return (Serializable) o;
2714:                } catch (Exception e) {
2715:                    logger.error(e);
2716:                    throw new VirtualDatabaseException(e);
2717:                }
2718:            }
2719:
2720:            /**
2721:             * Send a Message to a set of controllers. This sends a multicast message,
2722:             * fifo and checks for returned exceptions. No total order is specifically
2723:             * required (but enforced anyway). If an exception is returned, it is
2724:             * rethrown.
2725:             *
2726:             * @param members ArrayList of Member object refering to controllers
2727:             * @param message the message to send (should be Serializable)
2728:             * @param timeout message timeout in ms
2729:             * @return the result returned (except if one controller returned an exception
2730:             *         in which case it is automatically thrown)
2731:             * @throws VirtualDatabaseException (wrapping error) in case of communication
2732:             *           failure
2733:             */
2734:            public MulticastResponse sendMessageToControllers(
2735:                    ArrayList members, Serializable message, long timeout)
2736:                    throws VirtualDatabaseException {
2737:                try {
2738:                    MulticastResponse resp = getMulticastRequestAdapter()
2739:                            .multicastMessage(members, message,
2740:                                    MulticastRequestAdapter.WAIT_ALL, timeout);
2741:                    Iterator it = resp.getResults().keySet().iterator();
2742:                    while (it.hasNext()) {
2743:                        Object o = resp.getResults().get(it.next());
2744:                        // TODO: return compound exception instead of the first one
2745:                        if (o instanceof  Exception)
2746:                            throw (Exception) o;
2747:                    }
2748:                    return resp;
2749:                } catch (Exception e) {
2750:                    logger.error(e);
2751:                    throw new VirtualDatabaseException(e);
2752:                }
2753:            }
2754:
2755:            /**
2756:             * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#shutdown(int)
2757:             */
2758:            public void shutdown(int level) {
2759:                ActivityService.getInstance().reset(name);
2760:                if (partitionReconciler != null) {
2761:                    partitionReconciler.dispose();
2762:                }
2763:
2764:                // Shutdown cleanup threads
2765:                for (Iterator iter = cleanupThreads.values().iterator(); iter
2766:                        .hasNext();) {
2767:                    ControllerFailureCleanupThread thread = (ControllerFailureCleanupThread) iter
2768:                            .next();
2769:                    thread.shutdown();
2770:                }
2771:
2772:                // Shutdown request result failover cache clean-up thread
2773:                requestResultFailoverCache.shutdown();
2774:
2775:                super .shutdown(level);
2776:            }
2777:
2778:            /**
2779:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#transferBackend(java.lang.String,
2780:             *      java.lang.String)
2781:             */
2782:            public void transferBackend(String backend,
2783:                    String controllerDestination)
2784:                    throws VirtualDatabaseException {
2785:                TransferBackendOperation transferOperation = new TransferBackendOperation(
2786:                        backend, controllerDestination);
2787:                addAdminOperation(transferOperation);
2788:                try {
2789:                    Member targetMember = getControllerByName(controllerDestination);
2790:
2791:                    // Get reference on backend
2792:                    DatabaseBackend db = getAndCheckBackend(backend,
2793:                            CHECK_BACKEND_DISABLE);
2794:                    String transfertCheckpointName = buildCheckpointName("transfer backend: "
2795:                            + db.getName()
2796:                            + " from "
2797:                            + controller.getControllerName()
2798:                            + " to "
2799:                            + targetMember.getUid());
2800:
2801:                    if (logger.isDebugEnabled())
2802:                        logger.debug("**** Disabling backend for transfer");
2803:
2804:                    // Disable local backend
2805:                    try {
2806:                        if (!hasRecoveryLog())
2807:                            throw new VirtualDatabaseException(
2808:                                    "Transfer is not supported on virtual databases without a recovery log");
2809:
2810:                        distributedRequestManager.disableBackendWithCheckpoint(
2811:                                db, transfertCheckpointName);
2812:                    } catch (SQLException e) {
2813:                        throw new VirtualDatabaseException(e.getMessage());
2814:                    }
2815:
2816:                    // Enable remote transfered backend.
2817:                    try {
2818:                        if (logger.isDebugEnabled())
2819:                            logger.debug("**** Sending transfer message to:"
2820:                                    + targetMember);
2821:
2822:                        ArrayList dest = new ArrayList(1);
2823:                        dest.add(targetMember);
2824:
2825:                        sendMessageToController(targetMember,
2826:                                new BackendTransfer(controllerDestination,
2827:                                        transfertCheckpointName,
2828:                                        new BackendInfo(db)), messageTimeouts
2829:                                        .getBackendTransferTimeout());
2830:
2831:                        if (logger.isDebugEnabled())
2832:                            logger.debug("**** Removing local backend");
2833:
2834:                        // Remove backend from this controller
2835:                        removeBackend(db);
2836:
2837:                        // Broadcast updated backend list
2838:                        broadcastBackendInformation(getAllMemberButUs());
2839:                    } catch (Exception e) {
2840:                        String msg = "An error occured while transfering the backend";
2841:                        logger.error(msg, e);
2842:                        throw new VirtualDatabaseException(msg, e);
2843:                    }
2844:                } finally {
2845:                    removeAdminOperation(transferOperation);
2846:                }
2847:            }
2848:
2849:            /**
2850:             * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#transferDump(java.lang.String,
2851:             *      java.lang.String, boolean)
2852:             */
2853:            public void transferDump(String dumpName,
2854:                    String remoteControllerName, boolean noCopy)
2855:                    throws VirtualDatabaseException {
2856:                TransferDumpOperation transferOperation = new TransferDumpOperation(
2857:                        dumpName, remoteControllerName);
2858:                addAdminOperation(transferOperation);
2859:                try {
2860:                    // get the info from the backuper
2861:                    DumpInfo dumpInfo = null;
2862:                    try {
2863:                        dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
2864:                        /*
2865:                         * getDumpInfo() is the one that throws SQLException (should it be a
2866:                         * VirtualDatabaseException instead ???)
2867:                         */
2868:                    } catch (SQLException e) {
2869:                        String msg = "getting dump info from backup manager failed";
2870:                        throw new VirtualDatabaseException(msg, e);
2871:                    }
2872:
2873:                    if (dumpInfo == null)
2874:                        throw new VirtualDatabaseException(
2875:                                "no dump info for dump '" + dumpName + "'");
2876:
2877:                    if (remoteControllerName.equals(controller.getJmxName()))
2878:                        throw new VirtualDatabaseException(
2879:                                "Not transfering dump to myself");
2880:
2881:                    // if a copy is needed, hand-off copy to backuper: setup server side of
2882:                    // the
2883:                    // copy
2884:                    DumpTransferInfo dumpTransferInfo = null;
2885:                    if (!noCopy) {
2886:                        try {
2887:                            dumpTransferInfo = getRequestManager()
2888:                                    .getBackupManager().getBackuperByFormat(
2889:                                            dumpInfo.getDumpFormat())
2890:                                    .setupDumpServer();
2891:                        } catch (IOException e) {
2892:                            throw new VirtualDatabaseException(e);
2893:                        }
2894:                    }
2895:
2896:                    // send message to remote vdb instance, to act as a client
2897:                    // (see handleInitiateDumpCopy)
2898:                    sendMessageToController(remoteControllerName,
2899:                            new InitiateDumpCopy(dumpInfo, dumpTransferInfo),
2900:                            messageTimeouts.getInitiateDumpCopyTimeout());
2901:                } finally {
2902:                    removeAdminOperation(transferOperation);
2903:                }
2904:            }
2905:
2906:            /**
2907:             * If we are executing in a distributed virtual database, we have to make sure
2908:             * that we post the query in the queue following the total order. This method
2909:             * does not remove the request from the total order queue. You have to call
2910:             * removeHeadFromAndNotifyTotalOrderQueue() to do so.
2911:             *
2912:             * @param request the request to wait for (can be any object but usually a
2913:             *          DistributedRequest, Commit or Rollback)
2914:             * @param errorIfNotFound true if an error message should be logged if the
2915:             *          request is not found in the total order queue
2916:             * @return true if the element was found and wait has succeeded, false
2917:             *         otherwise
2918:             */
2919:            public boolean waitForTotalOrder(Object request,
2920:                    boolean errorIfNotFound) {
2921:                synchronized (totalOrderQueue) {
2922:                    int index = totalOrderQueue.indexOf(request);
2923:                    while (index > 0) {
2924:                        if (logger.isDebugEnabled())
2925:                            logger.debug("Waiting for " + index
2926:                                    + " queries to execute (current is "
2927:                                    + totalOrderQueue.get(0) + ")");
2928:                        try {
2929:                            totalOrderQueue.wait();
2930:                        } catch (InterruptedException ignore) {
2931:                        }
2932:                        index = totalOrderQueue.indexOf(request);
2933:                    }
2934:                    if (index == -1) {
2935:                        if (errorIfNotFound)
2936:                            logger
2937:                                    .error("Request was not found in total order queue, posting out of order ("
2938:                                            + request + ")");
2939:                        return false;
2940:                    } else
2941:                        return true;
2942:                }
2943:            }
2944:
2945:            /**
2946:             * Variant of the waitForTotalOrder() method, please refer to its
2947:             * documentation. The difference here is that we only wait for request of type
2948:             * SuspendWritesMessage which are in front of the given request. All other
2949:             * request are by-passed.
2950:             *
2951:             * @see ResumeActivity
2952:             * @see BlockActivity
2953:             * @see SuspendActivity
2954:             * @param request the request to wait for (expected to be a
2955:             *          ResumeActivityMessage)
2956:             * @param errorIfNotFound true if an error message should be logged if the
2957:             *          request is not found in the total order queue
2958:             * @return true if the element was found and wait has succeeded, false
2959:             *         otherwise
2960:             */
2961:            public boolean waitForBlockAndSuspendInTotalOrder(Object request,
2962:                    boolean errorIfNotFound) {
2963:                synchronized (totalOrderQueue) {
2964:                    // Init
2965:                    int index = totalOrderQueue.indexOf(request);
2966:                    boolean shouldWait = false;
2967:                    for (int i = 0; i < index; i++)
2968:                        if ((totalOrderQueue.get(i) instanceof  SuspendWritesMessage))
2969:                            shouldWait = true;
2970:
2971:                    // Main loop
2972:                    while ((index > 0) && shouldWait) {
2973:                        System.out.println("index=" + index + " shouldWait="
2974:                                + shouldWait);
2975:                        if (logger.isDebugEnabled())
2976:                            logger.debug("Waiting for " + index
2977:                                    + " queries to execute (current is "
2978:                                    + totalOrderQueue.get(0) + ")");
2979:                        try {
2980:                            totalOrderQueue.wait();
2981:                        } catch (InterruptedException ignore) {
2982:                        }
2983:                        index = totalOrderQueue.indexOf(request);
2984:                        shouldWait = false;
2985:                        for (int i = 0; i < index; i++)
2986:                            if ((totalOrderQueue.get(i) instanceof  SuspendWritesMessage))
2987:                                shouldWait = true;
2988:                    }
2989:
2990:                    if (index == -1) {
2991:                        if (errorIfNotFound)
2992:                            logger
2993:                                    .error("Request was not found in total order queue, posting out of order ("
2994:                                            + request + ")");
2995:                        return false;
2996:                    } else
2997:                        return true;
2998:                }
2999:            }
3000:
3001:            /**
3002:             * Adds an ongoing activity suspension marker to the current list only if
3003:             * suspension was not triggered by the local member.
3004:             *
3005:             * @param controllerMember member which triggered the activity suspension
3006:             */
3007:            public void addOngoingActivitySuspension(Member controllerMember) {
3008:                if (controllerMember.equals(multicastRequestAdapter
3009:                        .getChannel().getLocalMembership()))
3010:                    return;
3011:                ongoingActivitySuspensions.add(controllerMember);
3012:            }
3013:
3014:            /**
3015:             * Removes an ongoing activity suspension marker from the current list only if
3016:             * resuming was not triggered by the local member.
3017:             *
3018:             * @param controllerMember member which triggered the activity resuming
3019:             */
3020:            public void removeOngoingActivitySuspension(Member controllerMember) {
3021:                if (controllerMember.equals(multicastRequestAdapter
3022:                        .getChannel().getLocalMembership()))
3023:                    return;
3024:                ongoingActivitySuspensions.remove(controllerMember);
3025:            }
3026:
3027:            // Resume ongoing activity suspensions originated from the failed controller
3028:            private void resumeOngoingActivitySuspensions(
3029:                    Member controllerMember) {
3030:                List dest = new ArrayList();
3031:                dest.add(multicastRequestAdapter.getChannel()
3032:                        .getLocalMembership());
3033:
3034:                for (Iterator iter = ongoingActivitySuspensions.iterator(); iter
3035:                        .hasNext();) {
3036:                    Member m = (Member) iter.next();
3037:                    if (m.equals(controllerMember))
3038:                        try {
3039:                            multicastRequestAdapter.multicastMessage(dest,
3040:                                    new ResumeActivity(),
3041:                                    MulticastRequestAdapter.WAIT_ALL,
3042:                                    messageTimeouts.getDisableBackendTimeout());
3043:                            // Need to remove the marker here because the sender of the
3044:                            // ResumeActivity message will be the local member
3045:                            iter.remove();
3046:                        } catch (NotConnectedException e) {
3047:                            if (logger.isWarnEnabled())
3048:                                logger.warn(
3049:                                        "Problem when trying to resume ongoing activity suspensions triggered by "
3050:                                                + controllerMember, e);
3051:                        }
3052:                }
3053:            }
3054:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.