0001: /**
0002: * Sequoia: Database clustering technology.
0003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
0004: * Science And Control (INRIA).
0005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
0006: * Copyright (C) 2005-2006 Continuent, Inc.
0007: * Contact: sequoia@continuent.org
0008: *
0009: * Licensed under the Apache License, Version 2.0 (the "License");
0010: * you may not use this file except in compliance with the License.
0011: * You may obtain a copy of the License at
0012: *
0013: * http://www.apache.org/licenses/LICENSE-2.0
0014: *
0015: * Unless required by applicable law or agreed to in writing, software
0016: * distributed under the License is distributed on an "AS IS" BASIS,
0017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018: * See the License for the specific language governing permissions and
0019: * limitations under the License.
0020: *
0021: * Initial developer(s): Emmanuel Cecchet.
0022: * Contributor(s): Olivier Fambon, Damian Arregui, Karl Cassaigne.
0023: */package org.continuent.sequoia.controller.virtualdatabase;
0024:
0025: import java.io.IOException;
0026: import java.io.InputStream;
0027: import java.io.Serializable;
0028: import java.sql.SQLException;
0029: import java.util.ArrayList;
0030: import java.util.Collection;
0031: import java.util.Collections;
0032: import java.util.HashMap;
0033: import java.util.Hashtable;
0034: import java.util.Iterator;
0035: import java.util.LinkedList;
0036: import java.util.List;
0037: import java.util.Map;
0038: import java.util.Properties;
0039: import java.util.Map.Entry;
0040:
0041: import org.continuent.hedera.adapters.MessageListener;
0042: import org.continuent.hedera.adapters.MulticastRequestAdapter;
0043: import org.continuent.hedera.adapters.MulticastRequestListener;
0044: import org.continuent.hedera.adapters.MulticastResponse;
0045: import org.continuent.hedera.channel.AbstractReliableGroupChannel;
0046: import org.continuent.hedera.channel.ChannelException;
0047: import org.continuent.hedera.channel.NotConnectedException;
0048: import org.continuent.hedera.common.Group;
0049: import org.continuent.hedera.common.GroupIdentifier;
0050: import org.continuent.hedera.common.IpAddress;
0051: import org.continuent.hedera.common.Member;
0052: import org.continuent.hedera.factory.AbstractGroupCommunicationFactory;
0053: import org.continuent.hedera.gms.AbstractGroupMembershipService;
0054: import org.continuent.hedera.gms.GroupMembershipListener;
0055: import org.continuent.sequoia.common.exceptions.ControllerException;
0056: import org.continuent.sequoia.common.exceptions.NoMoreBackendException;
0057: import org.continuent.sequoia.common.exceptions.SequoiaException;
0058: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
0059: import org.continuent.sequoia.common.i18n.Translate;
0060: import org.continuent.sequoia.common.jmx.management.BackendInfo;
0061: import org.continuent.sequoia.common.jmx.management.DumpInfo;
0062: import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
0063: import org.continuent.sequoia.common.log.Trace;
0064: import org.continuent.sequoia.common.sql.metadata.MetadataContainer;
0065: import org.continuent.sequoia.common.sql.schema.DatabaseSchema;
0066: import org.continuent.sequoia.common.users.VirtualDatabaseUser;
0067: import org.continuent.sequoia.common.util.Constants;
0068: import org.continuent.sequoia.common.xml.DatabasesXmlTags;
0069: import org.continuent.sequoia.controller.backend.DatabaseBackend;
0070: import org.continuent.sequoia.controller.backend.result.ControllerResultSet;
0071: import org.continuent.sequoia.controller.backup.DumpTransferInfo;
0072: import org.continuent.sequoia.controller.core.Controller;
0073: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
0074: import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
0075: import org.continuent.sequoia.controller.requestmanager.RAIDbLevels;
0076: import org.continuent.sequoia.controller.requestmanager.RequestManager;
0077: import org.continuent.sequoia.controller.requestmanager.distributed.ControllerFailureCleanupThread;
0078: import org.continuent.sequoia.controller.requestmanager.distributed.DistributedRequestManager;
0079: import org.continuent.sequoia.controller.requests.AbstractRequest;
0080: import org.continuent.sequoia.controller.virtualdatabase.activity.ActivityService;
0081: import org.continuent.sequoia.controller.virtualdatabase.management.RestoreLogOperation;
0082: import org.continuent.sequoia.controller.virtualdatabase.management.TransferBackendOperation;
0083: import org.continuent.sequoia.controller.virtualdatabase.management.TransferDumpOperation;
0084: import org.continuent.sequoia.controller.virtualdatabase.protocol.AddVirtualDatabaseUser;
0085: import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendStatus;
0086: import org.continuent.sequoia.controller.virtualdatabase.protocol.BackendTransfer;
0087: import org.continuent.sequoia.controller.virtualdatabase.protocol.CompleteRecoveryLogResync;
0088: import org.continuent.sequoia.controller.virtualdatabase.protocol.ControllerInformation;
0089: import org.continuent.sequoia.controller.virtualdatabase.protocol.CopyLogEntry;
0090: import org.continuent.sequoia.controller.virtualdatabase.protocol.DisableBackendsAndSetCheckpoint;
0091: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedRequest;
0092: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedTransactionMarker;
0093: import org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage;
0094: import org.continuent.sequoia.controller.virtualdatabase.protocol.FlushGroupCommunicationMessages;
0095: import org.continuent.sequoia.controller.virtualdatabase.protocol.GetPreparedStatementMetadata;
0096: import org.continuent.sequoia.controller.virtualdatabase.protocol.GetStaticMetadata;
0097: import org.continuent.sequoia.controller.virtualdatabase.protocol.InitiateDumpCopy;
0098: import org.continuent.sequoia.controller.virtualdatabase.protocol.IsValidUserForAllBackends;
0099: import org.continuent.sequoia.controller.virtualdatabase.protocol.MessageTimeouts;
0100: import org.continuent.sequoia.controller.virtualdatabase.protocol.RemoveVirtualDatabaseUser;
0101: import org.continuent.sequoia.controller.virtualdatabase.protocol.ReplicateLogEntries;
0102: import org.continuent.sequoia.controller.virtualdatabase.protocol.ResumeActivity;
0103: import org.continuent.sequoia.controller.virtualdatabase.protocol.ResyncRecoveryLog;
0104: import org.continuent.sequoia.controller.virtualdatabase.protocol.SuspendWritesMessage;
0105: import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration;
0106: import org.continuent.sequoia.controller.virtualdatabase.protocol.VirtualDatabaseConfigurationResponse;
0107:
0108: /**
0109: * A <code>DistributedVirtualDatabase</code> is a virtual database hosted by
0110: * several controllers. Communication between the controllers is achieved with
0111: * reliable multicast provided by Javagroups.
0112: *
0113: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
0114: * @author <a href="mailto:Damian.Arregui@continuent.com">Damian Arregui</a>
0115: * @version 1.0
0116: */
0117: public class DistributedVirtualDatabase extends VirtualDatabase
0118: implements MessageListener, MulticastRequestListener,
0119: GroupMembershipListener {
0120: //
0121: // How the code is organized ?
0122: //
0123: // 1. Member variables
0124: // 2. Constructor(s)
0125: // 3. Request handling
0126: // 4. Transaction handling
0127: // 5. Database backend management
0128: // 6. Distribution management (multicast)
0129: // 7. Getter/Setter (possibly in alphabetical order)
0130: //
0131:
0132: // Distribution
0133:
0134: /** Group name */
0135: private String groupName = null;
0136:
0137: /**
0138: * Hashtable<Member,String>, the String being the controller JMX name
0139: * corresponding to the Member
0140: */
0141: private Hashtable controllerJmxAddress;
0142:
0143: /** Hashtable<Member, Long<DatabaseBackend>> */
0144: private Hashtable controllerIds;
0145:
0146: /** Hashtable<Member, List<DatabaseBackend>> */
0147: private Hashtable backendsPerController;
0148:
0149: /** Hedera channel */
0150: private AbstractReliableGroupChannel channel = null;
0151:
0152: /** MessageDispatcher to communicate with the group */
0153: private MulticastRequestAdapter multicastRequestAdapter = null;
0154:
0155: private MessageTimeouts messageTimeouts;
0156:
0157: private Group currentGroup = null;
0158:
0159: private ArrayList allMemberButUs = null;
0160:
0161: /**
0162: * Used by VirtualDatabaseConfiguration if a remote controller config is not
0163: * compatible
0164: */
0165: public static final long INCOMPATIBLE_CONFIGURATION = -1;
0166:
0167: private boolean isVirtualDatabaseStarted = false;
0168:
0169: /**
0170: * Our view of the request manager, same as super.requestManager, only just
0171: * typed properly.
0172: */
0173: private DistributedRequestManager distributedRequestManager;
0174:
0175: private boolean processMacroBeforeBroadcast;
0176:
0177: /**
0178: * List of threads that are cleaning up resources allocated by a dead remote
0179: * controller
0180: */
0181: private Hashtable cleanupThreads;
0182:
0183: /**
0184: * Stores the "flushed writes" status for each failed controller: true if
0185: * writes have been flushed by the controller failure clean-up thread or a vdb
0186: * worker thread clean-up thread, false otherwise.
0187: */
0188: private HashMap groupCommunicationMessagesLocallyFlushed;
0189:
0190: /**
0191: * Maximum time in ms allowed for clients to failover in case of a controller
0192: * failure
0193: */
0194: private long failoverTimeoutInMs;
0195:
0196: /**
0197: * Cache of request results to allow transparent failover if a failure occurs
0198: * during a write request.
0199: */
0200: private RequestResultFailoverCache requestResultFailoverCache;
0201:
0202: /** Logger for distributed request execution */
0203: private Trace distributedRequestLogger;
0204:
0205: private String hederaPropertiesFile;
0206:
0207: private final Object MESSAGES_IN_HANDLER_SYNC = new Object();
0208:
0209: private int messagesInHandlers = 0;
0210:
0211: private boolean channelShuttingDown = false;
0212:
0213: private boolean isResynchingFlag;
0214:
0215: private Hashtable controllerPersistentConnectionsRecovered = new Hashtable();
0216:
0217: private Hashtable controllerTransactionsRecovered = new Hashtable();
0218: private PartitionReconciler partitionReconciler;
0219:
0220: private List ongoingActivitySuspensions = new ArrayList();
0221:
0222: /** JVM-wide group communication factory */
0223: private static AbstractGroupCommunicationFactory groupCommunicationFactory = null;
0224:
0225: /**
0226: * Creates a new <code>DistributedVirtualDatabase</code> instance.
0227: *
0228: * @param controller the controller we belong to
0229: * @param name the virtual database name
0230: * @param groupName the virtual database group name
0231: * @param maxConnections maximum number of concurrent connections.
0232: * @param pool should we use a pool of threads for handling connections?
0233: * @param minThreads minimum number of threads in the pool
0234: * @param maxThreads maximum number of threads in the pool
0235: * @param maxThreadIdleTime maximum time a thread can remain idle before being
0236: * removed from the pool.
0237: * @param clientFailoverTimeoutInMs maximum time for clients to failover in
0238: * case of a controller failure
0239: * @param sqlShortFormLength maximum number of characters of an SQL statement
0240: * to display in traces or exceptions
0241: * @param useStaticResultSetMetaData true if DatabaseResultSetMetaData should
0242: * use static fields or try to fetch the metadata from the underlying
0243: * database
0244: * @param hederaPropertiesFile Hedera properties file defines the group
0245: * communication factory and its parameters
0246: */
0247: public DistributedVirtualDatabase(Controller controller,
0248: String name, String groupName, int maxConnections,
0249: boolean pool, int minThreads, int maxThreads,
0250: long maxThreadIdleTime, int sqlShortFormLength,
0251: long clientFailoverTimeoutInMs,
0252: boolean useStaticResultSetMetaData,
0253: String hederaPropertiesFile,
0254: boolean enforceTableExistenceIntoSchema) {
0255: super (controller, name, maxConnections, pool, minThreads,
0256: maxThreads, maxThreadIdleTime, sqlShortFormLength,
0257: useStaticResultSetMetaData,
0258: enforceTableExistenceIntoSchema);
0259:
0260: this .groupName = groupName;
0261: this .processMacroBeforeBroadcast = true;
0262: this .failoverTimeoutInMs = clientFailoverTimeoutInMs;
0263: requestResultFailoverCache = new RequestResultFailoverCache(
0264: logger, failoverTimeoutInMs);
0265: backendsPerController = new Hashtable();
0266: controllerJmxAddress = new Hashtable();
0267: controllerIds = new Hashtable();
0268: cleanupThreads = new Hashtable();
0269: groupCommunicationMessagesLocallyFlushed = new HashMap();
0270: isVirtualDatabaseStarted = false;
0271: distributedRequestLogger = Trace
0272: .getLogger("org.continuent.sequoia.controller.distributedvirtualdatabase.request."
0273: + name);
0274: this .hederaPropertiesFile = hederaPropertiesFile;
0275: this .totalOrderQueue = new LinkedList();
0276: }
0277:
0278: /**
0279: * Disconnect the channel and close it.
0280: *
0281: * @see java.lang.Object#finalize()
0282: */
0283: protected void finalize() throws Throwable {
0284: quitChannel();
0285: super .finalize();
0286: }
0287:
0288: /**
0289: * This method handle the scheduling part of the queries to be sure that the
0290: * query is scheduled in total order before letting other queries to execute.
0291: *
0292: * @see org.continuent.hedera.adapters.MulticastRequestListener#handleMessageSingleThreaded(java.io.Serializable,
0293: * org.continuent.hedera.common.Member)
0294: */
0295: public Object handleMessageSingleThreaded(Serializable msg,
0296: Member sender) {
0297: synchronized (MESSAGES_IN_HANDLER_SYNC) {
0298: if (channelShuttingDown)
0299: return MESSAGES_IN_HANDLER_SYNC;
0300: messagesInHandlers++;
0301: }
0302:
0303: try {
0304: if (msg != null) {
0305: if (logger.isDebugEnabled())
0306: logger.debug("handleMessageSingleThreaded ("
0307: + msg.getClass() + "): " + msg);
0308:
0309: if (msg instanceof DistributedVirtualDatabaseMessage) {
0310: return ((DistributedVirtualDatabaseMessage) msg)
0311: .handleMessageSingleThreaded(this , sender);
0312: }
0313: // Other message types will be handled in multithreaded handler
0314: } else {
0315: String errorMsg = "Invalid null message";
0316: logger.error(errorMsg);
0317: return new ControllerException(errorMsg);
0318: }
0319:
0320: return null;
0321: } catch (Exception e) {
0322: if (e instanceof RuntimeException)
0323: logger.warn("Error while handling group message:"
0324: + msg.getClass(), e);
0325: return e;
0326: }
0327: }
0328:
0329: /**
0330: * @see org.continuent.hedera.adapters.MulticastRequestListener#handleMessageMultiThreaded(java.io.Serializable,
0331: * org.continuent.hedera.common.Member, java.lang.Object)
0332: */
0333: public Serializable handleMessageMultiThreaded(Serializable msg,
0334: Member sender, Object handleMessageSingleThreadedResult) {
0335: // Check if we are shutting down first
0336: if (msg == MESSAGES_IN_HANDLER_SYNC)
0337: return null;
0338:
0339: try {
0340: if (msg == null) {
0341: String errorMsg = "Invalid null message";
0342: logger.error(errorMsg);
0343: return new ControllerException(errorMsg);
0344: }
0345:
0346: if (logger.isDebugEnabled())
0347: logger.debug("handleMessageMultiThreaded ("
0348: + msg.getClass() + "): " + msg);
0349: if (msg instanceof DistributedVirtualDatabaseMessage) {
0350: return ((DistributedVirtualDatabaseMessage) msg)
0351: .handleMessageMultiThreaded(this , sender,
0352: handleMessageSingleThreadedResult);
0353: } else
0354: logger.warn("Unhandled message type received: "
0355: + msg.getClass() + "(" + msg + ")");
0356:
0357: return null;
0358: } catch (Exception e) {
0359: if (e instanceof RuntimeException)
0360: logger.warn("Error while handling group message: "
0361: + msg.getClass(), e);
0362: return e;
0363: } finally {
0364: synchronized (MESSAGES_IN_HANDLER_SYNC) {
0365: messagesInHandlers--;
0366: if (messagesInHandlers <= 0)
0367: MESSAGES_IN_HANDLER_SYNC.notifyAll();
0368: }
0369:
0370: if (msg != null) {
0371: // Just in case something bad happen and the request was not properly
0372: // removed from the queue.
0373: if (msg instanceof DistributedRequest) {
0374: synchronized (totalOrderQueue) {
0375: if (totalOrderQueue
0376: .remove(((DistributedRequest) msg)
0377: .getRequest())) {
0378: logger
0379: .warn("Distributed request "
0380: + ((DistributedRequest) msg)
0381: .getRequest()
0382: .getSqlShortForm(
0383: getSqlShortFormLength())
0384: + " did not remove itself from the total order queue");
0385: totalOrderQueue.notifyAll();
0386: }
0387: }
0388: } else if (msg instanceof DistributedTransactionMarker) {
0389: synchronized (totalOrderQueue) {
0390: if (totalOrderQueue.remove(msg)) {
0391: logger
0392: .warn("Distributed "
0393: + msg.toString()
0394: + " did not remove "
0395: + "itself from the total order queue");
0396: totalOrderQueue.notifyAll();
0397: }
0398: }
0399: }
0400: }
0401: }
0402: }
0403:
0404: public void cancelMessage(Serializable msg) {
0405: if (msg instanceof DistributedVirtualDatabaseMessage) {
0406: ((DistributedVirtualDatabaseMessage) msg).cancel(this );
0407: } else
0408: logger.warn("Unhandled message type received: "
0409: + msg.getClass() + "(" + msg + ")");
0410: }
0411:
0412: /**
0413: * Flushes all messages in the group communication locally, i.e. ensures that
0414: * all previously scheduled messages are processed.
0415: *
0416: * @param failedControllerId controller whose suspected failure triggered the
0417: * flush operation.
0418: */
0419: public void flushGroupCommunicationMessagesLocally(
0420: long failedControllerId) {
0421: // Set "writes flushed" flag to false
0422: synchronized (groupCommunicationMessagesLocallyFlushed) {
0423: if (!groupCommunicationMessagesLocallyFlushed
0424: .containsKey(new Long(failedControllerId)))
0425: groupCommunicationMessagesLocallyFlushed.put(new Long(
0426: failedControllerId), Boolean.FALSE);
0427: }
0428:
0429: try {
0430: List dest = new ArrayList();
0431:
0432: if ((multicastRequestAdapter != null)
0433: && (multicastRequestAdapter.getChannel() != null)) {
0434: dest.add(multicastRequestAdapter.getChannel()
0435: .getLocalMembership());
0436:
0437: // Don't care about result
0438: multicastRequestAdapter.multicastMessage(dest,
0439: new FlushGroupCommunicationMessages(
0440: failedControllerId),
0441: MulticastRequestAdapter.WAIT_ALL, 0);
0442: }
0443: } catch (Throwable e) {
0444: logger
0445: .error(
0446: "Failed to flush group communication messages locally",
0447: e);
0448: } finally {
0449: // Set "writes flushed" flag to true and notify blocked vdb worker
0450: // threads
0451: synchronized (groupCommunicationMessagesLocallyFlushed) {
0452: groupCommunicationMessagesLocallyFlushed.put(new Long(
0453: failedControllerId), Boolean.TRUE);
0454: groupCommunicationMessagesLocallyFlushed.notifyAll();
0455: }
0456: }
0457: }
0458:
0459: /**
0460: * Waits for all messages in the group communication to be flushed locally.
0461: *
0462: * @param failedControllerId controller whose suspected failure triggered the
0463: * wait operation.
0464: */
0465: public void waitForGroupCommunicationMessagesLocallyFlushed(
0466: long failedControllerId) {
0467: Long controllerIdKey = new Long(failedControllerId);
0468:
0469: // Set "writes flushed" flag to false and wait for writes being flushed by
0470: // the controller failure clean-up thread or the vdb worker thread
0471: synchronized (groupCommunicationMessagesLocallyFlushed) {
0472: if (!groupCommunicationMessagesLocallyFlushed
0473: .containsKey(controllerIdKey))
0474: groupCommunicationMessagesLocallyFlushed.put(
0475: controllerIdKey, Boolean.FALSE);
0476:
0477: while (!((Boolean) groupCommunicationMessagesLocallyFlushed
0478: .get(controllerIdKey)).booleanValue()) {
0479: try {
0480: if (logger.isDebugEnabled()) {
0481: logger
0482: .debug("Will wait for writes to be flushed for failed controller "
0483: + controllerIdKey);
0484: }
0485: groupCommunicationMessagesLocallyFlushed.wait();
0486: } catch (InterruptedException e) {
0487: // Ignore exception
0488: }
0489: }
0490: }
0491: }
0492:
0493: /**
0494: * @see org.continuent.hedera.gms.GroupMembershipListener#failedMember(org.continuent.hedera.common.Member,
0495: * org.continuent.hedera.common.GroupIdentifier,
0496: * org.continuent.hedera.common.Member)
0497: */
0498: public void failedMember(Member failed, GroupIdentifier gid,
0499: Member sender) {
0500: quitMember(failed, gid);
0501: }
0502:
0503: /**
0504: * @see org.continuent.hedera.gms.GroupMembershipListener#groupComposition(org.continuent.hedera.common.Group,
0505: * org.continuent.hedera.common.IpAddress, int)
0506: */
0507: public void groupComposition(Group g, IpAddress sender,
0508: int gmsStatus) {
0509: // Just ignore
0510: }
0511:
0512: /**
0513: * @see org.continuent.hedera.gms.GroupMembershipListener#networkPartition(org.continuent.hedera.common.GroupIdentifier,
0514: * java.util.List)
0515: */
0516: public void networkPartition(GroupIdentifier gid,
0517: final List mergedGroupCompositions) {
0518: // We already left the group. Late notification. Ignore.
0519: if ((channel == null) || channelShuttingDown)
0520: return;
0521:
0522: if (gid.equals(currentGroup.getGroupIdentifier())) {
0523: String msg = "Network partition detected in group " + gid
0524: + ".";
0525: logger.error(msg);
0526: endUserLogger.error(msg);
0527:
0528: if (shuttingDown) {
0529: logger
0530: .warn("Do not start network reconciliation process since "
0531: + name + " is shutting down");
0532: return;
0533: }
0534:
0535: sendJmxNotification(
0536: SequoiaNotificationList.VDB_NETWORK_PARTITION_DETECTION,
0537: "A network partition has been detected for the virtual database "
0538: + name + " (gid=" + gid + ").");
0539: Thread t = new Thread() {
0540: public void run() {
0541: reconcile(mergedGroupCompositions);
0542: }
0543: };
0544: t.start();
0545: }
0546: }
0547:
0548: /**
0549: * Makes this virtual database join a virtual database group. Those groups are
0550: * mapped to JavaGroups groups.
0551: *
0552: * @exception Exception if an error occurs
0553: */
0554: public void joinGroup(boolean resyncRecoveryLog) throws Exception {
0555: RecoveryLog recoveryLog = distributedRequestManager
0556: .getRecoveryLog();
0557: if (recoveryLog == null) {
0558: String msg = "Distributed virtual database cannot be used without a recovery log defined.";
0559: if (logger.isFatalEnabled())
0560: logger.fatal(msg);
0561: throw new SequoiaException(msg);
0562: }
0563:
0564: logger.info("Recovery log size: "
0565: + recoveryLog.getRecoveryLogSize());
0566:
0567: try {
0568: Properties p = new Properties();
0569: InputStream is = this .getClass().getResourceAsStream(
0570: hederaPropertiesFile);
0571: if (is == null) {
0572: if (logger.isFatalEnabled())
0573: logger
0574: .fatal(Translate
0575: .get(
0576: "fatal.distributed.no.group.communication.properties",
0577: hederaPropertiesFile));
0578: endUserLogger
0579: .fatal(Translate
0580: .get(
0581: "fatal.distributed.no.group.communication.properties",
0582: hederaPropertiesFile));
0583: throw new SequoiaException(
0584: "Join group failed because Hedera properties file was not found.");
0585: }
0586: if (logger.isInfoEnabled())
0587: logger.info("Using Hedera properties file: "
0588: + hederaPropertiesFile);
0589: p.load(is);
0590: is.close();
0591:
0592: if (groupCommunicationFactory == null) {
0593: groupCommunicationFactory = (AbstractGroupCommunicationFactory) Class
0594: .forName(p.getProperty("hedera.factory"))
0595: .newInstance();
0596: }
0597: Object[] ret = groupCommunicationFactory
0598: .createChannelAndGroupMembershipService(p,
0599: new GroupIdentifier(groupName));
0600: AbstractGroupMembershipService gms = (AbstractGroupMembershipService) ret[1];
0601: gms.registerGroupMembershipListener(this );
0602: channel = (AbstractReliableGroupChannel) ret[0];
0603:
0604: if (logger.isDebugEnabled())
0605: logger
0606: .debug("Group communication channel is configured as follows: "
0607: + channel);
0608:
0609: // Join the group
0610: channel.join();
0611: currentGroup = channel.getGroup();
0612: multicastRequestAdapter = new MulticastRequestAdapter(
0613: channel // group
0614: // channel
0615: , this /* MessageListener */
0616: , this /* MulticastRequestListener */
0617: );
0618: multicastRequestAdapter.start();
0619:
0620: // Let the MulticastRequestAdapter thread pump the membership out of the
0621: // JGroups channel.
0622: Thread.sleep(2000);
0623:
0624: logger.info("Group " + groupName + " connected to "
0625: + channel.getLocalMembership());
0626:
0627: // Add ourselves to the list of controllers
0628: controllerJmxAddress.put(channel.getLocalMembership(),
0629: controller.getJmxName());
0630:
0631: long controllerId;
0632:
0633: // Check if we are alone or not
0634: List currentGroupMembers = currentGroup.getMembers();
0635: int groupSize = currentGroupMembers.size();
0636: if (groupSize == 1) {
0637: logger
0638: .info(Translate
0639: .get(
0640: "virtualdatabase.distributed.configuration.first.in.group",
0641: groupName));
0642:
0643: // Refuse to start if we are first but see no last-man-down marker
0644: if (!recoveryLog.isLastManDown())
0645: throw new VirtualDatabaseException(
0646: "NOT STARTING VDB (not the last man down).");
0647:
0648: allMemberButUs = new ArrayList();
0649: controllerId = 0;
0650: distributedRequestManager.setControllerId(controllerId);
0651: if (resyncRecoveryLog) {
0652: // Init backends states from persistence base
0653: distributedRequestManager
0654: .initBackendsLastKnownCheckpointFromRecoveryLog();
0655: recoveryLog.checkRecoveryLogConsistency();
0656: }
0657: } else {
0658: logger.info("Group now contains " + groupSize
0659: + " controllers.");
0660: if (logger.isDebugEnabled()) {
0661: logger
0662: .debug("Current list of controllers is as follows:");
0663: for (Iterator iter = currentGroupMembers.iterator(); iter
0664: .hasNext();)
0665: logger.debug("Controller " + iter.next());
0666: }
0667:
0668: refreshGroupMembership(); // also updates allMemberButUs
0669:
0670: // Check with the other controller that our config is compatible
0671: controllerId = checkConfigurationCompatibilityAndReturnControllerId(getAllMemberButUs());
0672: if (controllerId == INCOMPATIBLE_CONFIGURATION) {
0673: String msg = Translate
0674: .get("virtualdatabase.distributed.configuration.not.compatible");
0675: logger.error(msg);
0676: throw new ControllerException(msg);
0677: } else {
0678: // In case several controllers join at the same time they would get
0679: // the
0680: // same highest controller id value and here we discriminate them by
0681: // adding their position in the membership. This assumes that the
0682: // membership is ordered the same way at all nodes.
0683: controllerId += currentGroupMembers.indexOf(channel
0684: .getLocalMembership());
0685:
0686: if (logger.isInfoEnabled()) {
0687: logger
0688: .info(Translate
0689: .get("virtualdatabase.distributed.configuration.compatible"));
0690: logger.info("Controller identifier is set to: "
0691: + controllerId);
0692: }
0693: // Set the controller Id
0694: distributedRequestManager
0695: .setControllerId(controllerId);
0696: }
0697:
0698: if (resyncRecoveryLog) {
0699: // Init backends states from persistence base
0700: distributedRequestManager
0701: .initBackendsLastKnownCheckpointFromRecoveryLog();
0702: }
0703:
0704: // Distribute backends among controllers knowing that at this point
0705: // there is no conflict on the backend distribution policies.
0706: broadcastBackendInformation(getAllMemberButUs());
0707: }
0708:
0709: // Now let group comm messages flow in
0710: isVirtualDatabaseStarted = true;
0711:
0712: // Resync the recovery log, if any
0713: if (resyncRecoveryLog && (groupSize > 1)
0714: && hasRecoveryLog()) {
0715: logger.info("Resyncing recovery log ...");
0716: resyncRecoveryLog();
0717: logger.info("Resyncing recovery log done");
0718: }
0719:
0720: initGlobalCounters(getControllerId());
0721:
0722: recoveryLog.clearLastManDown();
0723:
0724: } catch (Exception e) {
0725: if (channel != null) {
0726: quitChannel();
0727: }
0728: String msg = Translate.get(
0729: "virtualdatabase.distributed.joingroup.error",
0730: groupName);
0731: if (e instanceof RuntimeException)
0732: logger.error(msg, e);
0733: throw new Exception(msg + " (" + e + ")", e);
0734: }
0735:
0736: }
0737:
0738: private void reconcile(List suspectedMembers) {
0739: try {
0740: distributedRequestManager.blockActivity();
0741: } catch (SQLException e) {
0742: logger.warn(e.getMessage(), e);
0743: }
0744:
0745: boolean shutdownLocally = false;
0746:
0747: Iterator iter = suspectedMembers.iterator();
0748: while (iter.hasNext()) {
0749: Member other = (Member) iter.next();
0750: if (isLocalSender(other)) {
0751: continue;
0752: }
0753: partitionReconciler = new PartitionReconciler(name,
0754: groupName, channel.getLocalMembership(),
0755: hederaPropertiesFile);
0756:
0757: PartitionReconciliationStatus reconciliationStatus;
0758: try {
0759: reconciliationStatus = partitionReconciler
0760: .reconcileWith(other);
0761: partitionReconciler.dispose();
0762: } catch (Exception e) {
0763: logger.error("Exception while reconciling with "
0764: + other, e);
0765: // in doubt, shutdown arbitrarily one of the vdb parts
0766: shutdownUnlessFirstMember(suspectedMembers);
0767: return;
0768: }
0769: String msg = "reconciliation status = "
0770: + reconciliationStatus + ", other = " + other;
0771: logger.info(msg);
0772: endUserLogger.info(msg);
0773:
0774: if (reconciliationStatus == PartitionReconciliationStatus.NO_ACTIVITY) {
0775: try {
0776: Collections.sort(suspectedMembers);
0777: Member firstMember = (Member) suspectedMembers
0778: .get(0);
0779: if (!(firstMember.equals(channel
0780: .getLocalMembership()))) {
0781: logger.info(channel.getLocalMembership()
0782: + " is rejoining group " + currentGroup
0783: + "...");
0784: quitChannel();
0785: channelShuttingDown = false;
0786: joinGroup(false);
0787: logger.info(channel.getLocalMembership()
0788: + " has rejoined group " + currentGroup
0789: + "...");
0790: }
0791: distributedRequestManager.resumeActivity();
0792: } catch (Exception e) {
0793: logger.error("Exception while reconciling with "
0794: + other, e);
0795: // in doubt, shutdown arbitrarily one of the vdb parts
0796: shutdownUnlessFirstMember(suspectedMembers);
0797: }
0798: return;
0799: }
0800: if (reconciliationStatus == PartitionReconciliationStatus.ALONE_IN_THE_WORLD) {
0801: shutdown(Constants.SHUTDOWN_FORCE);
0802: return;
0803: }
0804: if (reconciliationStatus == PartitionReconciliationStatus.OTHER_ALONE_IN_THE_WORLD) {
0805: // do nothing
0806: continue;
0807: }
0808: // in all other cases, decide arbitrarily which vdb part will survive
0809: shutdownLocally = shutdownUnlessFirstMember(suspectedMembers);
0810: if (shutdownLocally) {
0811: return;
0812: }
0813: }
0814: if (logger.isInfoEnabled()) {
0815: logger.info("End of reconciliation process for " + name
0816: + ". Resume activity normally.");
0817: }
0818: distributedRequestManager.resumeActivity();
0819: }
0820:
0821: /*
0822: * Shutdown local member unless it is the first in the members list (sorted
0823: * alphabetically).
0824: */
0825: private boolean shutdownUnlessFirstMember(List members) {
0826: Collections.sort(members);
0827: Member firstMember = (Member) members.get(0);
0828: logger.fatal(firstMember + " will remain as master.");
0829: if (!(firstMember.equals(channel.getLocalMembership()))) {
0830: logger.fatal("Forcing virtual database shutdown here at "
0831: + channel.getLocalMembership() + ".");
0832: distributedRequestManager.resumeActivity();
0833: shutdown(Constants.SHUTDOWN_FORCE);
0834: return true;
0835: } else {
0836: logger.fatal("Virtual database here at "
0837: + channel.getLocalMembership()
0838: + " remaining as master.");
0839: return false;
0840: }
0841: }
0842:
0843: /**
0844: * Checks if re-synchronizing the recovery log is necessary, and if so,
0845: * initiates the recovery log recovery process and waits until it is complete.
0846: * This is called as part of the vdb startup process, before backends are
0847: * ready to be enabled.
0848: *
0849: * @throws VirtualDatabaseException in case of error
0850: * @throws SQLException rethrown from recoverylog operations
0851: */
0852: private void resyncRecoveryLog() throws VirtualDatabaseException,
0853: SQLException {
0854: if (getAllMembers().size() == 1) {
0855: logger
0856: .info("First controller in vdb, no recovery log resync.");
0857: return;
0858: }
0859:
0860: String lastShutdownCheckpointName = getLastShutdownCheckpointName();
0861: if (lastShutdownCheckpointName == null) {
0862: // Reset recovery log cleanup if JVM property
0863: // "reset.dirty.recoverylog" is set to false (default is true)
0864: if ("true".equals(System.getProperty(
0865: "reset.dirty.recoverylog", "true"))) {
0866: logger
0867: .info("No shutdown checkpoint found in recovery log. Clearing recovery log (dirty).");
0868: logger
0869: .info("Please resync manually using 'restore log'.");
0870: getRecoveryLog().resetRecoveryLog(false);
0871: } else {
0872: logger
0873: .info("No shutdown checkpoint found in recovery log: please resync manually using 'restore log'.");
0874: logger
0875: .warn("Keeping recovery log for debug purpose only because reset.dirty.recoverylog property is set to false. \n"
0876: + "DO NOT PERFORM ADMIN COMMANDS ON THIS VIRTUAL DATABASE ON THIS NODE BEFORE RESTORING THE RECOVERY LOG.");
0877: }
0878: return;
0879: }
0880:
0881: // try to resync from the last shutdown checkpoint
0882: isResynchingFlag = true;
0883:
0884: try {
0885: logger.info("Resyncing from " + lastShutdownCheckpointName);
0886: resyncFromCheckpoint(lastShutdownCheckpointName);
0887: } catch (VirtualDatabaseException e) {
0888: // Reset recovery log cleanup if JVM property
0889: // "reset.dirty.recoverylog" is set to false (default is true)
0890: if ("true".equals(System.getProperty(
0891: "reset.dirty.recoverylog", "true"))) {
0892: logger
0893: .error("Failed to resync recovery log from last clean shutdown checkpoint. Clearing recovery log (dirty).");
0894: logger
0895: .info("Please resync manually using 'restore log'.");
0896: getRecoveryLog().resetRecoveryLog(false);
0897: } else {
0898: logger
0899: .error("Failed to resync recovery log from last clean shutdown checkpoint.");
0900: logger
0901: .warn("Keeping recovery log for debug purpose only because reset.dirty.recoverylog property is set to false. \n"
0902: + "DO NOT PERFORM ADMIN COMMANDS ON THIS VIRTUAL DATABASE ON THIS NODE BEFORE RESTORING THE RECOVERY LOG.");
0903: }
0904: isResynchingFlag = false;
0905: }
0906: }
0907:
0908: /**
0909: * Initializes global counters based on recovery log.
0910: *
0911: * @param controllerId this controller id, as allocated by hte group
0912: * communication. Base of all global counters numbering: counters are
0913: * layed out as [ controllerId | <local count> ]
0914: * @throws SQLException if an error occurs accessing the recovery log
0915: */
0916: public void initGlobalCounters(long controllerId)
0917: throws SQLException {
0918: // Counters update should be properly synchronized since this it may happen
0919: // concurrently with a vdb worker thread delivering unique IDs. This
0920: // scenario occurs when using the "restore log" admin command following a
0921: // vdb crash.
0922: if (!hasRecoveryLog())
0923: return; // no recovery log: no init. Stick to default values (zero)
0924: RecoveryLog recoveryLog = requestManager.getRecoveryLog();
0925: requestManager.initializeRequestId(recoveryLog
0926: .getLastRequestId(controllerId) + 1);
0927: requestManager.getScheduler().initializeTransactionId(
0928: recoveryLog.getLastTransactionId(controllerId) + 1);
0929: synchronized (CONNECTION_ID_SYNC_OBJECT) {
0930: // Use the max operator as a safeguard: IDs may have been delivered but
0931: // not logged yet.
0932: this .connectionId = Math.max(this .connectionId, recoveryLog
0933: .getLastConnectionId(controllerId)) + 1;
0934: }
0935: }
0936:
0937: private void resyncFromCheckpoint(String checkpointName)
0938: throws VirtualDatabaseException {
0939: // talk to first other member in group
0940: Member remoteControllerMember = (Member) getAllMemberButUs()
0941: .get(0);
0942:
0943: // send message, no return expected, just block until it's done.
0944: sendMessageToController(remoteControllerMember,
0945: new ResyncRecoveryLog(checkpointName), messageTimeouts
0946: .getDefaultTimeout());
0947:
0948: // If a remote error occured, or if the resync failed for any reason
0949: // whatsoever, a VirtualDatabaseException has been thrown.
0950:
0951: isResynchingFlag = false;
0952: }
0953:
0954: protected boolean isResyncing() {
0955: return isResynchingFlag;
0956: }
0957:
0958: /**
0959: * Return the last ShutdownCheckpointName, or null if none was found.
0960: *
0961: * @return the last ShutdownCheckpointName, or null if none was found.
0962: * @throws VirtualDatabaseException in case getting the cp names list from the
0963: * recovery log failed
0964: */
0965: private String getLastShutdownCheckpointName()
0966: throws VirtualDatabaseException {
0967: // get checkpoint names and see which is the last shutdown checkpoint. This
0968: // list is expected to be ordered, newest first.
0969: ArrayList checkpointNames;
0970: try {
0971: checkpointNames = getRecoveryLog().getCheckpointNames();
0972: } catch (SQLException e) {
0973: logger.error(e.getMessage());
0974: throw new VirtualDatabaseException(e);
0975: }
0976:
0977: Iterator iter = checkpointNames.iterator();
0978: while (iter.hasNext()) {
0979: String cpName = (String) iter.next();
0980: if (cpName.startsWith("shutdown-" + getControllerName()))
0981: return cpName;
0982: }
0983: return null;
0984: }
0985:
0986: /**
0987: * @see org.continuent.hedera.gms.GroupMembershipListener#joinMember(org.continuent.hedera.common.Member,
0988: * org.continuent.hedera.common.GroupIdentifier)
0989: */
0990: public void joinMember(Member m, GroupIdentifier gid) {
0991: if (hasRecoveryLog()) {
0992: try {
0993: requestManager.getRecoveryLog()
0994: .storeCheckpoint(
0995: buildCheckpointName(m
0996: + " joined group " + gid));
0997: } catch (SQLException ignore) {
0998: logger
0999: .warn("Failed to log checkpoint for joining member "
1000: + m);
1001: }
1002: }
1003: }
1004:
1005: //
1006: // Message dispatcher request handling
1007: //
1008:
1009: /**
1010: * Terminate the multicast request adapter and quit the Hedera channel.
1011: */
1012: public void quitChannel() {
1013: quitChannel(Constants.SHUTDOWN_SAFE);
1014: }
1015:
1016: /**
1017: * Terminate the multicast request adapter and quit the Hedera channel.
1018: *
1019: * @param level of the vdb shutdown operation being executed
1020: */
1021: public void quitChannel(int level) {
1022: if (level == Constants.SHUTDOWN_FORCE) {
1023: multicastRequestAdapter.cancelRequests();
1024: }
1025: synchronized (MESSAGES_IN_HANDLER_SYNC) {
1026: channelShuttingDown = true;
1027: if (messagesInHandlers > 0)
1028: try {
1029: MESSAGES_IN_HANDLER_SYNC.wait();
1030: } catch (InterruptedException ignore) {
1031: }
1032: }
1033:
1034: if (multicastRequestAdapter != null) {
1035: multicastRequestAdapter.stop();
1036: multicastRequestAdapter = null;
1037: }
1038: if (channel != null) {
1039: channel.close();
1040: try {
1041: channel.quit();
1042: } catch (ChannelException e) {
1043: if (logger.isWarnEnabled()) {
1044: logger.warn("Problem when quitting channel "
1045: + channel, e);
1046: }
1047: } catch (NotConnectedException e) {
1048: if (logger.isWarnEnabled()) {
1049: logger.warn("Problem when quitting channel "
1050: + channel, e);
1051: }
1052: }
1053: channel = null;
1054: }
1055: if (groupCommunicationFactory != null) {
1056: // If we were not able to successfully dispose the factory we must keep a
1057: // reference to it in order to dispose it later
1058: if (groupCommunicationFactory.dispose())
1059: groupCommunicationFactory = null;
1060: }
1061: }
1062:
1063: /**
1064: * @see org.continuent.hedera.gms.GroupMembershipListener#quitMember(org.continuent.hedera.common.Member,
1065: * org.continuent.hedera.common.GroupIdentifier)
1066: */
1067: public void quitMember(Member m, GroupIdentifier gid) {
1068: synchronized (MESSAGES_IN_HANDLER_SYNC) {
1069: // Ignore if channel has been closed (i.e. vdb shutdown)
1070: if ((channel == null) || (channelShuttingDown))
1071: return;
1072: messagesInHandlers++;
1073: }
1074:
1075: try {
1076: // Ignore our own quit message
1077: if (isLocalSender(m))
1078: return;
1079:
1080: if (hasRecoveryLog()) {
1081: try {
1082: requestManager.getRecoveryLog().storeCheckpoint(
1083: buildCheckpointName(m + " quit group "
1084: + gid));
1085: } catch (SQLException ignore) {
1086: logger
1087: .warn("Failed to log checkpoint for quitting member "
1088: + m);
1089: }
1090: }
1091:
1092: ActivityService.getInstance().addUnreachableMember(name,
1093: m.getAddress());
1094:
1095: // Remove controller from list and notify JMX listeners
1096: String remoteControllerName = removeRemoteControllerAndStartCleanupThread(m);
1097: if (remoteControllerName != null) {
1098: endUserLogger.warn(Translate.get(
1099: "notification.distributed.controller.removed",
1100: new String[] { m.toString(), name }));
1101: logger.warn("Controller " + m
1102: + " has left the cluster.");
1103: sendJmxNotification(
1104: SequoiaNotificationList.DISTRIBUTED_CONTROLLER_REMOVED,
1105: Translate
1106: .get(
1107: "notification.distributed.controller.removed",
1108: m, name));
1109: }
1110:
1111: // Notify adapter that we do not expect responses anymore from this member
1112: synchronized (MESSAGES_IN_HANDLER_SYNC) {
1113: // Ignore if channel is being closed, i.e. vdb shutdown.
1114: if (!channelShuttingDown) {
1115: int failures = multicastRequestAdapter
1116: .memberFailsOnAllReplies(m);
1117: logger.info(failures
1118: + " requests were waiting responses from "
1119: + m);
1120: }
1121: }
1122: } finally {
1123: synchronized (MESSAGES_IN_HANDLER_SYNC) {
1124: messagesInHandlers--;
1125: }
1126: }
1127: }
1128:
1129: /**
1130: * @see org.continuent.hedera.adapters.MessageListener#receive(java.io.Serializable)
1131: */
1132: public void receive(Serializable msg) {
1133: logger
1134: .error("Distributed virtual database received unhandled message: "
1135: + msg);
1136: }
1137:
1138: /**
1139: * Refresh the current group membership when someone has joined or left the
1140: * group.
1141: */
1142: private void refreshGroupMembership() {
1143: if (logger.isDebugEnabled())
1144: logger.debug("Refreshing members list:"
1145: + currentGroup.getMembers());
1146:
1147: synchronized (controllerJmxAddress) {
1148: allMemberButUs = (ArrayList) (((ArrayList) currentGroup
1149: .getMembers()).clone());
1150: allMemberButUs.remove(channel.getLocalMembership());
1151: }
1152: }
1153:
1154: //
1155: // Getter/Setter and tools (equals, ...)
1156: //
1157:
1158: /**
1159: * Two virtual databases are equal if they have the same name, login and
1160: * password.
1161: *
1162: * @param other an object
1163: * @return a <code>boolean</code> value
1164: */
1165: public boolean equals(Object other) {
1166: if ((other == null)
1167: || (!(other instanceof org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase)))
1168: return false;
1169: else {
1170: DistributedVirtualDatabase db = (org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase) other;
1171: return name.equals(db.getDatabaseName())
1172: && groupName.equals(db.getGroupName());
1173: }
1174: }
1175:
1176: /**
1177: * Synchronized access to current group members.
1178: *
1179: * @return a clone of the list of all members (never null).
1180: */
1181: public ArrayList getAllMembers() {
1182: // FIXME Should not return ArrayList but rather List
1183: synchronized (controllerJmxAddress) {
1184: if (currentGroup == null) // this happens if we did not #joinGroup()
1185: return new ArrayList();
1186: ArrayList members = (ArrayList) currentGroup.getMembers();
1187: if (members == null) // SEQUOIA-745 fix
1188: return new ArrayList();
1189: return (ArrayList) members.clone();
1190: }
1191: }
1192:
1193: /**
1194: * Returns the list of all members in the group except us. Consider the value
1195: * read-only (do not alter).
1196: *
1197: * @return the allMembersButUs field (never null).
1198: */
1199: public ArrayList getAllMemberButUs() {
1200: synchronized (controllerJmxAddress) {
1201: if (allMemberButUs == null) // this happens if we did not #joinGroup()
1202: return new ArrayList();
1203:
1204: /**
1205: * This synchronized block might seem loussy, but actually it's enough, as
1206: * long as no caller alters the returned value: field allMembersButUs is
1207: * replaced (as opposed to updated) by refreshGroupMembership(). So
1208: * someone who has called this lives with a (possibly) outdated list, but,
1209: * still, with a safe list (never updated concurently by vdb threads). If
1210: * clients/callers are not trusted to leave the returned value un-touched,
1211: * use a clone
1212: */
1213: return allMemberButUs;
1214: }
1215: }
1216:
1217: /**
1218: * Get the group channel used for group communications
1219: *
1220: * @return a <code>JChannel</code>
1221: */
1222: public AbstractReliableGroupChannel getChannel() {
1223: return channel;
1224: }
1225:
1226: /**
1227: * Returns the cleanupThreads value.
1228: *
1229: * @return Returns the cleanupThreads.
1230: */
1231: public Hashtable getCleanupThreads() {
1232: return cleanupThreads;
1233: }
1234:
1235: /**
1236: * Used by the ControllerFailureCleanupThread to cleanup following a
1237: * controller failure. This returned the list of recovered transactions and
1238: * removes the list.
1239: *
1240: * @param controllerId the id of the failed controller
1241: * @return List of recovered transactions for the given controller id (null if
1242: * none)
1243: */
1244: public List getTransactionsRecovered(Long controllerId) {
1245: return (List) controllerTransactionsRecovered
1246: .remove(controllerId);
1247: }
1248:
1249: /**
1250: * Used by the ControllerFailureCleanupThread to cleanup following a
1251: * controller failure. This returned the list of recovered persistent
1252: * connections and removes the list.
1253: *
1254: * @param controllerId the id of the failed controller
1255: * @return List of recovered persistent connections for the given controller
1256: * id (null if none)
1257: */
1258: public List getControllerPersistentConnectionsRecovered(
1259: Long controllerId) {
1260: return (List) controllerPersistentConnectionsRecovered
1261: .remove(controllerId);
1262: }
1263:
1264: /**
1265: * Called by FailoverForPersistentConnection when a client reconnects
1266: * following a controller failure.
1267: *
1268: * @param controllerId the id of the failed controller
1269: * @param connectionId the id of the persistent connection
1270: */
1271: public void notifyPersistentConnectionFailover(Long controllerId,
1272: Long connectionId) {
1273: synchronized (controllerPersistentConnectionsRecovered) {
1274: LinkedList persistentConnectionsRecovered = (LinkedList) controllerPersistentConnectionsRecovered
1275: .get(controllerId);
1276: if (persistentConnectionsRecovered == null) {
1277: persistentConnectionsRecovered = new LinkedList();
1278: controllerPersistentConnectionsRecovered.put(
1279: controllerId, persistentConnectionsRecovered);
1280: }
1281:
1282: persistentConnectionsRecovered.add(connectionId);
1283: if (logger.isInfoEnabled())
1284: logger
1285: .info("Failover detected for persistent connection "
1286: + connectionId);
1287: }
1288: }
1289:
1290: /**
1291: * Called by FailoverForTransaction when a client reconnects following a
1292: * controller failure.
1293: *
1294: * @param controllerId the id of the failed controller
1295: * @param transactionId the id of the transaction
1296: */
1297: public void notifyTransactionFailover(Long controllerId,
1298: Long transactionId) {
1299: synchronized (controllerTransactionsRecovered) {
1300: LinkedList transactionsRecovered = (LinkedList) controllerTransactionsRecovered
1301: .get(controllerId);
1302: if (transactionsRecovered == null) {
1303: transactionsRecovered = new LinkedList();
1304: controllerTransactionsRecovered.put(controllerId,
1305: transactionsRecovered);
1306: }
1307:
1308: transactionsRecovered.add(transactionId);
1309: if (logger.isInfoEnabled())
1310: logger.info("Failover detected for transaction "
1311: + transactionId);
1312: }
1313: }
1314:
1315: /**
1316: * Gets a Controller specified by its name as a Member object suitable for
1317: * group communication.
1318: *
1319: * @param controllerName the name of the target controller
1320: * @return a Member representing the target controller
1321: * @throws VirtualDatabaseException
1322: */
1323: private Member getControllerByName(String controllerName)
1324: throws VirtualDatabaseException {
1325: // Get the target controller
1326: Iterator iter = controllerJmxAddress.entrySet().iterator();
1327: Member targetMember = null;
1328: while (iter.hasNext()) {
1329: Entry entry = (Entry) iter.next();
1330: if (entry.getValue().equals(controllerName)) {
1331: targetMember = (Member) entry.getKey();
1332: break;
1333: }
1334: }
1335: if (targetMember == null)
1336: throw new VirtualDatabaseException(
1337: "Cannot find controller " + controllerName
1338: + " in group");
1339: return targetMember;
1340: }
1341:
1342: /**
1343: * Returns the controllerName value.
1344: *
1345: * @return Returns the controllerName.
1346: */
1347: public String getControllerName() {
1348: return controller.getControllerName();
1349: }
1350:
1351: /**
1352: * Returns the controller ID.
1353: *
1354: * @return Returns the controller ID.
1355: */
1356: public long getControllerId() {
1357: return ((DistributedRequestManager) requestManager)
1358: .getControllerId();
1359: }
1360:
1361: /**
1362: * Returns the currentGroup value.
1363: *
1364: * @return Returns the currentGroup.
1365: */
1366: public Group getCurrentGroup() {
1367: return currentGroup;
1368: }
1369:
1370: /**
1371: * Returns the distributedRequestLogger value.
1372: *
1373: * @return Returns the distributedRequestLogger.
1374: */
1375: public final Trace getDistributedRequestLogger() {
1376: return distributedRequestLogger;
1377: }
1378:
1379: /**
1380: * Get the XML dump of the Distribution element.
1381: *
1382: * @return XML dump of the Distribution element
1383: */
1384: protected String getDistributionXml() {
1385: StringBuffer info = new StringBuffer();
1386: info.append("<" + DatabasesXmlTags.ELT_Distribution + " "
1387: + DatabasesXmlTags.ATT_groupName + "=\"" + groupName
1388: + "\" " + DatabasesXmlTags.ATT_hederaPropertiesFile
1389: + "=\"" + hederaPropertiesFile + "\" "
1390: + DatabasesXmlTags.ATT_clientFailoverTimeout + "=\""
1391: + failoverTimeoutInMs + "\">");
1392:
1393: getMessageTimeouts().generateXml(info);
1394:
1395: info.append("</" + DatabasesXmlTags.ELT_Distribution + ">");
1396: return info.toString();
1397: }
1398:
1399: /**
1400: * Returns the group name this virtual database belongs to.
1401: *
1402: * @return a <code>String</code> value. Returns <code>null</code> if this
1403: * virtual database is standalone
1404: */
1405: public String getGroupName() {
1406: return groupName;
1407: }
1408:
1409: /**
1410: * Sets the group name used by the controllers hosting this virtual database.
1411: *
1412: * @param groupName the group name to set
1413: */
1414: public void setGroupName(String groupName) {
1415: this .groupName = groupName;
1416: }
1417:
1418: /**
1419: * Returns the messageTimeouts value.
1420: *
1421: * @return Returns the messageTimeouts.
1422: */
1423: public MessageTimeouts getMessageTimeouts() {
1424: return messageTimeouts;
1425: }
1426:
1427: /**
1428: * Sets the messageTimeouts value.
1429: *
1430: * @param messageTimeouts The messageTimeouts to set.
1431: */
1432: public void setMessageTimeouts(MessageTimeouts messageTimeouts) {
1433: this .messageTimeouts = messageTimeouts;
1434: }
1435:
1436: /**
1437: * Return the group communication multicast request adapter.
1438: *
1439: * @return the group communication multicast request adapter
1440: */
1441: public MulticastRequestAdapter getMulticastRequestAdapter() {
1442: return multicastRequestAdapter;
1443: }
1444:
1445: //
1446: // Getter/Setter and tools (equals, ...)
1447: //
1448:
1449: /**
1450: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#getNextConnectionId()
1451: */
1452: public long getNextConnectionId() {
1453: long id = super .getNextConnectionId();
1454: return distributedRequestManager.getNextConnectionId(id);
1455: }
1456:
1457: //
1458: // Getter/Setter and tools (equals, ...)
1459: //
1460:
1461: //
1462: // Getter/Setter and tools (equals, ...)
1463: //
1464:
1465: protected int getNumberOfEnabledBackends()
1466: throws VirtualDatabaseException {
1467: // 1/ get number of local active backends
1468: int nbActive = super .getNumberOfEnabledBackends();
1469:
1470: // 2/ add remote active backends
1471:
1472: // TODO: synchronize this access to backendsPerController (and others)
1473: DatabaseBackend b;
1474: Iterator iter = backendsPerController.keySet().iterator();
1475: while (iter.hasNext()) {
1476: Member member = (Member) iter.next();
1477:
1478: List remoteBackends = (List) backendsPerController
1479: .get(member);
1480: int size = remoteBackends.size();
1481: b = null;
1482: for (int i = 0; i < size; i++) {
1483: b = (DatabaseBackend) remoteBackends.get(i);
1484: if (b.isReadEnabled() || b.isWriteEnabled())
1485: // test symetrical to RequestManager.backupBackend()
1486: nbActive++;
1487: }
1488: }
1489: /*
1490: * temporary, until backendsPerController is really updated (not yet done),
1491: * make as is force=true in backupBackend().
1492: */
1493: nbActive = -1;
1494: return nbActive;
1495: }
1496:
1497: /**
1498: * Return a ControllerResultSet containing the PreparedStatement metaData of
1499: * the given sql template
1500: *
1501: * @param request the request containing the sql template
1502: * @return an empty ControllerResultSet with the metadata
1503: * @throws SQLException if a database error occurs
1504: */
1505: public ControllerResultSet getPreparedStatementGetMetaData(
1506: AbstractRequest request) throws SQLException {
1507: try {
1508: return requestManager
1509: .getPreparedStatementGetMetaData(request);
1510: } catch (NoMoreBackendException e) {
1511: // Try remote controllers
1512: try {
1513: MulticastResponse rspList = getMulticastRequestAdapter()
1514: .multicastMessage(
1515: getAllMemberButUs(),
1516: new GetPreparedStatementMetadata(
1517: request),
1518: MulticastRequestAdapter.WAIT_ALL,
1519: getMessageTimeouts()
1520: .getVirtualDatabaseConfigurationTimeout());
1521:
1522: Map results = rspList.getResults();
1523: if (results.size() == 0)
1524: if (logger.isWarnEnabled())
1525: logger
1526: .warn("No response while getting prepared statement metadata from remote controller");
1527: for (Iterator iter = results.values().iterator(); iter
1528: .hasNext();) {
1529: Object response = iter.next();
1530: if (response instanceof ControllerException) {
1531: if (logger.isErrorEnabled()) {
1532: logger
1533: .error("Error while getting prepared statement metadata from remote controller");
1534: }
1535: } else {
1536: // Here we succeded in getting prepared statement metadata from a
1537: // remote controller
1538: return (ControllerResultSet) response;
1539: }
1540: }
1541: } catch (NotConnectedException e2) {
1542: if (logger.isErrorEnabled())
1543: logger
1544: .error(
1545: "Channel unavailable while getting prepared statement metadata from remote controller",
1546: e2);
1547: }
1548:
1549: // Here we didn't succeded in getting prepared statement metadata from
1550: // another controller
1551: throw e;
1552: }
1553: }
1554:
1555: /**
1556: * Returns the processMacroBeforeBroadcast value.
1557: *
1558: * @return Returns the processMacroBeforeBroadcast.
1559: */
1560: public boolean isProcessMacroBeforeBroadcast() {
1561: return processMacroBeforeBroadcast;
1562: }
1563:
1564: /**
1565: * Sets the processMacroBeforeBroadcast value.
1566: *
1567: * @param processMacros true if macros must be processed before broadcast.
1568: */
1569: public void setProcessMacroBeforeBroadcast(boolean processMacros) {
1570: this .processMacroBeforeBroadcast = processMacros;
1571: }
1572:
1573: /**
1574: * Returns the request result failover cache associated to this distributed
1575: * virtual database.
1576: *
1577: * @return a <code>RequestResultFailoverCache</code> object.
1578: */
1579: public RequestResultFailoverCache getRequestResultFailoverCache() {
1580: return requestResultFailoverCache;
1581: }
1582:
1583: /**
1584: * Sets a new distributed request manager for this database.
1585: *
1586: * @param requestManager the new request manager.
1587: */
1588: public void setRequestManager(RequestManager requestManager) {
1589: if (!(requestManager instanceof DistributedRequestManager))
1590: throw new RuntimeException(
1591: "A distributed virtual database can only work with a distributed request manager.");
1592:
1593: distributedRequestManager = (DistributedRequestManager) requestManager;
1594: // really, this is super.requestManager
1595: this .requestManager = distributedRequestManager;
1596: }
1597:
1598: /**
1599: * Get the whole static metadata for this virtual database. A new empty
1600: * metadata object is created if there was none yet. It will be filled later
1601: * by gatherStaticMetadata() when the backend is enabled.
1602: *
1603: * @return Virtual database static metadata
1604: */
1605: public VirtualDatabaseStaticMetaData getStaticMetaData() {
1606: staticMetadata = doGetStaticMetaData();
1607:
1608: // If no backends enabled and vdb is distributed try remote controllers
1609: if ((staticMetadata == null)
1610: || (staticMetadata.getMetadataContainer() == null)) {
1611: try {
1612: MulticastResponse rspList = getMulticastRequestAdapter()
1613: .multicastMessage(
1614: getAllMemberButUs(),
1615: new GetStaticMetadata(),
1616: MulticastRequestAdapter.WAIT_ALL,
1617: getMessageTimeouts()
1618: .getVirtualDatabaseConfigurationTimeout());
1619:
1620: Map results = rspList.getResults();
1621: if (results.size() == 0)
1622: if (logger.isWarnEnabled())
1623: logger
1624: .warn("No response while getting static metadata from remote controller");
1625: for (Iterator iter = results.values().iterator(); iter
1626: .hasNext();) {
1627: Object response = iter.next();
1628: if (response instanceof ControllerException) {
1629: if (logger.isErrorEnabled()) {
1630: logger
1631: .error("Error while getting static metadata from remote controller");
1632: }
1633: } else {
1634: // Here we succeded in getting static metadata from a remote
1635: // controller
1636: staticMetadata
1637: .setMetadataContainer((MetadataContainer) response);
1638: }
1639: }
1640: } catch (NotConnectedException e2) {
1641: if (logger.isErrorEnabled())
1642: logger
1643: .error(
1644: "Channel unavailable while getting static metadata from remote controller",
1645: e2);
1646: }
1647: }
1648:
1649: return staticMetadata;
1650: }
1651:
1652: /**
1653: * Check if the given backend definition is compatible with the backend
1654: * definitions of this distributed virtual database. Not that if the given
1655: * backend does not exist in the current configuration, it is considered as
1656: * compatible. Incompatibility results from 2 backends with the same JDBC URL
1657: * or same logical name.
1658: *
1659: * @param backend the backend to check
1660: * @return true if the backend is compatible with the local definition
1661: * @throws VirtualDatabaseException if locking the local backend list fails
1662: */
1663: public boolean isCompatibleBackend(BackendInfo backend)
1664: throws VirtualDatabaseException {
1665: try {
1666: acquireReadLockBackendLists();
1667: } catch (InterruptedException e) {
1668: String msg = "Unable to acquire read lock on backend list in isCompatibleBackend ("
1669: + e + ")";
1670: logger.error(msg);
1671: throw new VirtualDatabaseException(msg);
1672: }
1673:
1674: try {
1675: // Find the backend
1676: String backendURL = backend.getUrl();
1677: String backendName = backend.getName();
1678: int size = backends.size();
1679: DatabaseBackend b = null;
1680: for (int i = 0; i < size; i++) {
1681: b = (DatabaseBackend) backends.get(i);
1682: if (b.getURL().equals(backendURL)
1683: || b.getName().equals(backendName))
1684: return false;
1685: }
1686: } catch (RuntimeException re) {
1687: throw new VirtualDatabaseException(re);
1688: } finally {
1689: releaseReadLockBackendLists();
1690: }
1691: // This backend does not exist here
1692: return true;
1693: }
1694:
1695: /**
1696: * Return true if the provided schema is compatible with the existing schema
1697: * of this distributed virtual database. Note that if the given schema is
1698: * null, this function returns true.
1699: *
1700: * @param dbs the database schema to compare with
1701: * @return true if dbs is compatible with the current schema (according to
1702: * RAIDb level)
1703: */
1704: public boolean isCompatibleDatabaseSchema(DatabaseSchema dbs) {
1705: // Database schema checking (if any)
1706: if (dbs == null) {
1707: logger
1708: .warn(Translate
1709: .get("virtualdatabase.distributed.configuration.checking.noschema"));
1710: } else {
1711: // Check database schemas compatibility
1712: switch (getRequestManager().getLoadBalancer()
1713: .getRAIDbLevel()) {
1714: case RAIDbLevels.RAIDb0:
1715: // There must be no overlap between schemas
1716: if (dbs.equals(getRequestManager().getDatabaseSchema())) {
1717: logger
1718: .warn(Translate
1719: .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1720: return false;
1721: }
1722: break;
1723: case RAIDbLevels.RAIDb1:
1724: // Schemas must be identical
1725: if (!dbs
1726: .equals(getRequestManager().getDatabaseSchema())) {
1727: logger
1728: .warn(Translate
1729: .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1730: return false;
1731: }
1732: break;
1733: case RAIDbLevels.RAIDb2:
1734: // Common parts of the schema must be identical
1735: if (!dbs.isCompatibleWith(getRequestManager()
1736: .getDatabaseSchema())) {
1737: logger
1738: .warn(Translate
1739: .get("virtualdatabase.distributed.configuration.checking.mismatch.databaseschema"));
1740: return false;
1741: }
1742: break;
1743: case RAIDbLevels.SingleDB:
1744: default:
1745: logger.error("Unsupported RAIDb level: "
1746: + getRequestManager().getLoadBalancer()
1747: .getRAIDbLevel());
1748: return false;
1749: }
1750: }
1751: return true;
1752: }
1753:
1754: /**
1755: * Is this virtual database distributed ?
1756: *
1757: * @return true
1758: */
1759: public boolean isDistributed() {
1760: return true;
1761: }
1762:
1763: /**
1764: * Returns the isVirtualDatabaseStarted value.
1765: *
1766: * @return Returns the isVirtualDatabaseStarted.
1767: */
1768: public final boolean isVirtualDatabaseStarted() {
1769: return isVirtualDatabaseStarted;
1770: }
1771:
1772: /**
1773: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#getControllers()
1774: */
1775: public String[] viewControllerList() {
1776: if (logger.isInfoEnabled()) {
1777: logger.info(channel.getLocalMembership() + " see members:"
1778: + currentGroup.getMembers() + " and has mapping:"
1779: + controllerJmxAddress);
1780: }
1781: Collection controllerJmxNames = controllerJmxAddress.values();
1782: return (String[]) controllerJmxNames
1783: .toArray(new String[controllerJmxNames.size()]);
1784: }
1785:
1786: /**
1787: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#viewGroupBackends()
1788: */
1789: public Hashtable viewGroupBackends()
1790: throws VirtualDatabaseException {
1791: Hashtable map = super .viewGroupBackends();
1792: synchronized (backendsPerController) {
1793: Iterator iter = backendsPerController.keySet().iterator();
1794: while (iter.hasNext()) {
1795: Member member = (Member) iter.next();
1796:
1797: // Create an List<BackendInfo> from the member backend list
1798: List backends = (List) backendsPerController
1799: .get(member);
1800: List backendInfos = DatabaseBackend
1801: .toBackendInfos(backends);
1802: map.put(controllerJmxAddress.get(member), backendInfos);
1803: }
1804: }
1805: return map;
1806: }
1807:
1808: //
1809: // Distributed virtual database management functions
1810: //
1811:
1812: /**
1813: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#addBackend(org.continuent.sequoia.controller.backend.DatabaseBackend)
1814: */
1815: public void addBackend(DatabaseBackend db)
1816: throws VirtualDatabaseException {
1817: // Add the backend to the virtual database.
1818: super .addBackend(db);
1819:
1820: // Send a group message if already joined group
1821: try {
1822: broadcastBackendInformation(getAllMemberButUs());
1823: } catch (Exception e) {
1824: String msg = "Error while broadcasting backend information when adding backend";
1825: logger.error(msg, e);
1826: throw new VirtualDatabaseException(msg, e);
1827: }
1828: }
1829:
1830: /**
1831: * Add a controller id to the controllerIds list.
1832: *
1833: * @param remoteControllerMembership the membership identifying the remote
1834: * controller
1835: * @param remoteControllerId remote controller identifier
1836: */
1837: public void addRemoteControllerId(
1838: Member remoteControllerMembership, long remoteControllerId) {
1839: controllerIds.put(remoteControllerMembership, new Long(
1840: remoteControllerId));
1841:
1842: if (logger.isDebugEnabled())
1843: logger.debug("Adding new controller id:"
1844: + remoteControllerId + " for member "
1845: + remoteControllerMembership);
1846: }
1847:
1848: /**
1849: * Add a list of remote backends to the backendsPerController map.
1850: *
1851: * @param sender the membership identifying the remote controller
1852: * @param remoteBackends remote controller backends
1853: */
1854: public void addBackendPerController(Member sender,
1855: List remoteBackends) {
1856: backendsPerController.put(sender, remoteBackends);
1857:
1858: if (logger.isInfoEnabled())
1859: logger
1860: .info(Translate
1861: .get(
1862: "virtualdatabase.distributed.configuration.updating.backend.list",
1863: sender));
1864: }
1865:
1866: /**
1867: * Returns the local view of the backends in this virtual database across all
1868: * <em>remote</em> controllers.
1869: *
1870: * @return a Hashtable<Member, List<DatabaseBackend>>
1871: */
1872: public Hashtable getBackendsPerController() {
1873: return backendsPerController;
1874: }
1875:
1876: /**
1877: * Add a new controller name to the controllerJmxAddress list and refresh the
1878: * group membership.
1879: *
1880: * @param remoteControllerMembership the membership identifying the remote
1881: * controller
1882: * @param remoteControllerJmxName the JMX name of the remote controller
1883: */
1884: public void addRemoteControllerJmxName(
1885: Member remoteControllerMembership,
1886: String remoteControllerJmxName) {
1887: controllerJmxAddress.put(remoteControllerMembership,
1888: remoteControllerJmxName);
1889: if (logger.isDebugEnabled())
1890: logger.debug("Adding new controller "
1891: + remoteControllerJmxName + " for member "
1892: + remoteControllerMembership);
1893:
1894: sendJmxNotification(
1895: SequoiaNotificationList.DISTRIBUTED_CONTROLLER_ADDED,
1896: Translate.get(
1897: "notification.distributed.controller.added",
1898: new String[] { remoteControllerJmxName, name }));
1899:
1900: refreshGroupMembership();
1901: }
1902:
1903: /**
1904: * Broadcast backend information among controllers.
1905: *
1906: * @param dest List of <code>Address</code> to send the message to
1907: * @throws NotConnectedException if the channel is not connected
1908: */
1909: private void broadcastBackendInformation(ArrayList dest)
1910: throws NotConnectedException {
1911: logger
1912: .debug(Translate
1913: .get("virtualdatabase.distributed.configuration.querying.remote.status"));
1914:
1915: // Send our backend status using serializable BackendInfo
1916: List backendInfos = DatabaseBackend.toBackendInfos(backends);
1917: MulticastResponse rspList = multicastRequestAdapter
1918: .multicastMessage(dest, new BackendStatus(backendInfos,
1919: distributedRequestManager.getControllerId()),
1920: MulticastRequestAdapter.WAIT_ALL,
1921: messageTimeouts.getBackendStatusTimeout());
1922:
1923: int size = dest.size();
1924: for (int i = 0; i < size; i++) {
1925: // Add the backend configuration of every remote controller
1926: Member m = (Member) dest.get(i);
1927: if (rspList.getResult(m) != null) {
1928: BackendStatus bs = (BackendStatus) rspList.getResult(m);
1929: // Update backend list from sender
1930: List remoteBackendInfos = bs.getBackendInfos();
1931: // convert the BackendInfos to DatabaseBackends
1932: List remoteBackends = BackendInfo
1933: .toDatabaseBackends(remoteBackendInfos);
1934: backendsPerController.put(m, remoteBackends);
1935: if (logger.isDebugEnabled())
1936: logger
1937: .debug(Translate
1938: .get(
1939: "virtualdatabase.distributed.configuration.updating.backend.list",
1940: m.toString()));
1941: } else
1942: logger
1943: .warn(Translate
1944: .get(
1945: "virtualdatabase.distributed.unable.get.remote.status",
1946: m.toString()));
1947: }
1948: }
1949:
1950: /**
1951: * Send the configuration of this controller to remote controller. All remote
1952: * controllers must agree on the compatibility of the local controller
1953: * configuration with their own configuration. Compatibility checking include
1954: * Authentication Manager, Scheduler and Load Balancer settings.
1955: *
1956: * @param dest List of <code>Address</code> to send the message to
1957: * @return INCOMPATIBLE_CONFIGURATION if the configuration is not compatible
1958: * with other controllers or the controller id to use otherwise.
1959: */
1960: private long checkConfigurationCompatibilityAndReturnControllerId(
1961: ArrayList dest) {
1962: if (logger.isInfoEnabled())
1963: logger
1964: .info(Translate
1965: .get("virtualdatabase.distributed.configuration.checking"));
1966:
1967: // Send our configuration
1968: MulticastResponse rspList;
1969: try {
1970: rspList = multicastRequestAdapter.multicastMessage(dest,
1971: new VirtualDatabaseConfiguration(this ),
1972: MulticastRequestAdapter.WAIT_ALL, messageTimeouts
1973: .getVirtualDatabaseConfigurationTimeout());
1974: } catch (NotConnectedException e) {
1975: logger
1976: .error(
1977: "Channel unavailable while checking configuration compatibility",
1978: e);
1979: return INCOMPATIBLE_CONFIGURATION;
1980: }
1981:
1982: // Check that everybody agreed
1983: Map results = rspList.getResults();
1984: int size = results.size();
1985: if (size == 0)
1986: logger
1987: .warn(Translate
1988: .get("virtualdatabase.distributed.configuration.checking.noanswer"));
1989:
1990: long highestRemoteControllerId = 0;
1991: for (Iterator iter = results.values().iterator(); iter
1992: .hasNext();) {
1993: Object response = iter.next();
1994: if (response instanceof VirtualDatabaseConfigurationResponse) {
1995: // These highestRemotecontrollerId and remoteControllerId are returned
1996: // directly by the remote controller, and are 'thus' of 'shifted
1997: // nature': effective bits = upper 16 bits. See
1998: // DistributedRequestManager.CONTROLLER_ID_BITS
1999: VirtualDatabaseConfigurationResponse vdbcr = (VirtualDatabaseConfigurationResponse) response;
2000: long remoteControllerId = vdbcr.getControllerId();
2001: if (remoteControllerId == INCOMPATIBLE_CONFIGURATION) {
2002: return INCOMPATIBLE_CONFIGURATION;
2003: }
2004: // Check if there still is a problem of missing vdb users.
2005: // If it is the case try to add them dynamically.
2006: if (logger.isWarnEnabled()) {
2007: logger
2008: .warn("Some virtual database users are missing from this configuration, trying to create them transparently...");
2009: }
2010: if (vdbcr.getAdditionalVdbUsers() != null) {
2011: for (Iterator iter2 = vdbcr.getAdditionalVdbUsers()
2012: .iterator(); iter2.hasNext();) {
2013: VirtualDatabaseUser vdbUser = (VirtualDatabaseUser) iter2
2014: .next();
2015:
2016: // Using the "super" trick here probably means bad design.
2017: // The intent is to create the vdb users just locally, hence we use
2018: // the method in
2019: // VirtualDatabase rather than the overridden method in
2020: // DistributedVirtual Database.
2021: super .checkAndAddVirtualDatabaseUser(vdbUser);
2022:
2023: if (!getAuthenticationManager()
2024: .isValidVirtualUser(vdbUser)) {
2025: return INCOMPATIBLE_CONFIGURATION;
2026: }
2027: }
2028: }
2029:
2030: if (highestRemoteControllerId < remoteControllerId)
2031: highestRemoteControllerId = remoteControllerId;
2032: } else {
2033: logger
2034: .error("Unexpected response while checking configuration compatibility: "
2035: + response);
2036: return INCOMPATIBLE_CONFIGURATION;
2037: }
2038: }
2039:
2040: // Ok, everybody agreed that our configuration is compatible.
2041: // Take the highest controller id + 1 as our id. (non-shifted, this is used
2042: // to pass in setControllerId which expects 16 bits)
2043: return ((highestRemoteControllerId >> DistributedRequestManager.CONTROLLER_ID_SHIFT_BITS) & DistributedRequestManager.CONTROLLER_ID_BITS) + 1;
2044: }
2045:
2046: /**
2047: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#checkAndAddVirtualDatabaseUser(org.continuent.sequoia.common.users.VirtualDatabaseUser)
2048: */
2049: public void checkAndAddVirtualDatabaseUser(
2050: VirtualDatabaseUser vdbUser) {
2051: // Is vdb user valid?
2052: MulticastResponse rspList;
2053: try {
2054: rspList = multicastRequestAdapter.multicastMessage(
2055: getAllMembers(), new IsValidUserForAllBackends(
2056: vdbUser), MulticastRequestAdapter.WAIT_ALL,
2057: messageTimeouts
2058: .getVirtualDatabaseConfigurationTimeout());
2059: } catch (NotConnectedException e) {
2060: logger.error(
2061: "Channel unavailable while checking validity of vdb user "
2062: + vdbUser.getLogin(), e);
2063: return;
2064: }
2065:
2066: // Check that everybody agreed
2067: Map results = rspList.getResults();
2068: int size = results.size();
2069: if (size == 0)
2070: logger
2071: .warn("No response while checking validity of vdb user "
2072: + vdbUser.getLogin());
2073: for (Iterator iter = results.values().iterator(); iter
2074: .hasNext();) {
2075: Object response = iter.next();
2076: if (response instanceof Boolean) {
2077: if (!((Boolean) response).booleanValue()) {
2078: if (logger.isWarnEnabled()) {
2079: logger
2080: .warn("Could not create new vdb user "
2081: + vdbUser.getLogin()
2082: + " because it does not exist on all backends");
2083: }
2084: return;
2085: }
2086: } else {
2087: logger
2088: .error("Unexpected response while checking validity of vdb user "
2089: + vdbUser.getLogin() + " : " + response);
2090: return;
2091: }
2092: }
2093:
2094: // Add user
2095: try {
2096: rspList = multicastRequestAdapter.multicastMessage(
2097: getAllMembers(),
2098: new AddVirtualDatabaseUser(vdbUser),
2099: MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2100: .getVirtualDatabaseConfigurationTimeout());
2101: } catch (NotConnectedException e) {
2102: logger
2103: .error("Channel unavailable while adding vdb user "
2104: + vdbUser.getLogin()
2105: + ", trying to clean-up...", e);
2106: removeVirtualDatabaseUser(vdbUser);
2107: }
2108:
2109: // Check for exceptions
2110: results = rspList.getResults();
2111: size = results.size();
2112: if (size == 0)
2113: logger.warn("No response while adding vdb user "
2114: + vdbUser.getLogin());
2115: for (Iterator iter = results.values().iterator(); iter
2116: .hasNext();) {
2117: Object response = iter.next();
2118: if (response instanceof ControllerException) {
2119: if (logger.isErrorEnabled()) {
2120: logger.error("Error while adding vdb user "
2121: + vdbUser.getLogin()
2122: + ", trying to clean-up...");
2123: }
2124: removeVirtualDatabaseUser(vdbUser);
2125: return;
2126: }
2127: }
2128: }
2129:
2130: /**
2131: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#closePersistentConnection(java.lang.String,
2132: * long)
2133: */
2134: public void closePersistentConnection(String login,
2135: long persistentConnectionId) {
2136: distributedRequestManager.distributedClosePersistentConnection(
2137: login, persistentConnectionId);
2138: }
2139:
2140: /**
2141: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#openPersistentConnection(java.lang.String,
2142: * long)
2143: */
2144: public void openPersistentConnection(String login,
2145: long persistentConnectionId) throws SQLException {
2146: distributedRequestManager.distributedOpenPersistentConnection(
2147: login, persistentConnectionId);
2148: }
2149:
2150: /**
2151: * What this method does is really initiating the copy. It is the remote
2152: * controller's vdb that performs the actual copy, fetching the dump from this
2153: * vdb's local backuper.
2154: *
2155: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#copyDump(java.lang.String,
2156: * java.lang.String)
2157: * @param dumpName the name of the dump to copy. Should exist locally, and not
2158: * remotely.
2159: * @param remoteControllerName the remote controller to talk to.
2160: * @throws VirtualDatabaseException in case of error.
2161: */
2162: public void copyDump(String dumpName, String remoteControllerName)
2163: throws VirtualDatabaseException {
2164: transferDump(dumpName, remoteControllerName, false);
2165: }
2166:
2167: /**
2168: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#copyLogFromCheckpoint(java.lang.String,
2169: * java.lang.String)
2170: */
2171: public void copyLogFromCheckpoint(String dumpName,
2172: String controllerName) throws VirtualDatabaseException {
2173: // perform basic error checks (in particular, on success, we have a
2174: // recovery log)
2175: super .copyLogFromCheckpoint(dumpName, controllerName);
2176:
2177: Member controllerByName = getControllerByName(controllerName);
2178: if (isLocalSender(controllerByName))
2179: throw new VirtualDatabaseException(
2180: "A restore log command must be applied to a remote controller");
2181:
2182: // get the checkpoint name from the dump info, or die
2183: String dumpCheckpointName;
2184: DumpInfo dumpInfo;
2185:
2186: try {
2187: dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
2188: } catch (SQLException e) {
2189: throw new VirtualDatabaseException(
2190: "Recovery log error access occured while checking for dump"
2191: + dumpName, e);
2192: }
2193:
2194: if (dumpInfo == null)
2195: throw new VirtualDatabaseException(
2196: "No information was found in the dump table for dump "
2197: + dumpName);
2198:
2199: RestoreLogOperation restoreLogOperation = new RestoreLogOperation(
2200: dumpName, controllerName);
2201: addAdminOperation(restoreLogOperation);
2202: try {
2203: dumpCheckpointName = dumpInfo.getCheckpointName();
2204:
2205: // set a global 'now' checkpoint (temporary) and suspend all activities
2206: String nowCheckpointName = setLogReplicationCheckpoint(controllerName);
2207:
2208: // AT THIS POINT, ALL ACTIVITIES ARE SUSPENDED
2209:
2210: // get its id (ewerk) so that we can replicate it on the other side
2211: long nowCheckpointId;
2212: RecoveryLog recoveryLog = getRequestManager()
2213: .getRecoveryLog();
2214: try {
2215: try {
2216: nowCheckpointId = recoveryLog
2217: .getCheckpointLogId(nowCheckpointName);
2218: } catch (SQLException e) {
2219: String errorMessage = "Cannot find 'now checkpoint' log entry";
2220: logger.error(errorMessage);
2221: throw new VirtualDatabaseException(errorMessage);
2222: }
2223:
2224: // initiate the replication - clears the remote recovery log.
2225: sendMessageToController(controllerByName,
2226: new ReplicateLogEntries(nowCheckpointName,
2227: null, dumpName, nowCheckpointId),
2228: messageTimeouts.getReplicateLogEntriesTimeout());
2229: } finally {
2230: getRequestManager().resumeActivity();
2231: }
2232:
2233: // SCHEDULER ACTIVITIES ARE RESUMES AT THIS POINT
2234:
2235: // protect from concurrent log updates: fake a recovery (increments
2236: // semaphore)
2237: recoveryLog.beginRecovery();
2238:
2239: // copy the entries over to the remote controller.
2240: // Send them one by one over to the remote controller, coz each LogEntry
2241: // can
2242: // potentially be huge (e.g. if it contains a blob)
2243: try {
2244: ArrayList dest = new ArrayList();
2245: dest.add(controllerByName);
2246: long copyLogEntryTimeout = getMessageTimeouts()
2247: .getCopyLogEntryTimeout();
2248: long dumpId = recoveryLog
2249: .getCheckpointLogId(dumpCheckpointName);
2250:
2251: if (logger.isDebugEnabled()) {
2252: logger.debug("Resynchronizing from checkpoint "
2253: + dumpCheckpointName + " (" + dumpId
2254: + ") to checkpoint " + nowCheckpointName
2255: + " (" + nowCheckpointId + ")");
2256: }
2257:
2258: for (long id = dumpId; id < nowCheckpointId; id++) {
2259: LogEntry entry = recoveryLog.getNextLogEntry(id);
2260: if (entry == null) {
2261: // No more entries available, stop here
2262: break;
2263: }
2264:
2265: // Because 'getNextLogEntry()' will hunt for the next valid log entry,
2266: // we need to update the iterator with the new id value - 1
2267: id = entry.getLogId() - 1;
2268:
2269: MulticastResponse resp = getMulticastRequestAdapter()
2270: .multicastMessage(dest,
2271: new CopyLogEntry(entry),
2272: MulticastRequestAdapter.WAIT_NONE,
2273: copyLogEntryTimeout);
2274: if (resp.getFailedMembers() != null)
2275: throw new IOException(
2276: "Failed to deliver log entry " + id
2277: + " to remote controller "
2278: + controllerName);
2279: }
2280:
2281: // Now check that no entry was missed by the other controller since we
2282: // shipped all entries asynchronously without getting any individual ack
2283: // (much faster to address SEQUOIA-504)
2284: long localNbOfLogEntries = recoveryLog
2285: .getNumberOfLogEntries(dumpId, nowCheckpointId);
2286:
2287: if (logger.isDebugEnabled()) {
2288: logger
2289: .debug("Checking that "
2290: + localNbOfLogEntries
2291: + " entries were resynchronized in remote log");
2292: }
2293:
2294: Serializable replyValue = sendMessageToController(
2295: controllerByName,
2296: new CompleteRecoveryLogResync(dumpId,
2297: nowCheckpointName, localNbOfLogEntries),
2298: getMessageTimeouts()
2299: .getReplicateLogEntriesTimeout());
2300: if (replyValue instanceof Long) {
2301: long diff = ((Long) replyValue).longValue();
2302: if (diff != 0)
2303: throw new VirtualDatabaseException(
2304: "Recovery log resynchronization reports a difference of "
2305: + diff + " entries");
2306: } else
2307: throw new RuntimeException(
2308: "Invalid answer from remote controller on CompleteRecoveryLogResync ("
2309: + replyValue + ")");
2310:
2311: // terminate the replication - sets the remote dump checkpoint name.
2312: sendMessageToController(controllerName,
2313: new ReplicateLogEntries(null,
2314: dumpCheckpointName, dumpName, dumpId),
2315: messageTimeouts.getReplicateLogEntriesTimeout());
2316: } catch (Exception e) {
2317: String errorMessage = "Failed to send log entries";
2318: logger.error(errorMessage, e);
2319: throw new VirtualDatabaseException(errorMessage);
2320: } finally {
2321: recoveryLog.endRecovery(); // release semaphore
2322: }
2323: } finally {
2324: removeAdminOperation(restoreLogOperation);
2325: }
2326: }
2327:
2328: /**
2329: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForPersistentConnection(long)
2330: */
2331: public void failoverForPersistentConnection(
2332: long persistentConnectionId) {
2333: distributedRequestManager
2334: .distributedFailoverForPersistentConnection(persistentConnectionId);
2335: }
2336:
2337: /**
2338: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#failoverForTransaction(long)
2339: */
2340: public void failoverForTransaction(long currentTid) {
2341: distributedRequestManager
2342: .distributedFailoverForTransaction(currentTid);
2343: }
2344:
2345: /**
2346: * Returns the recovery log associated with this controller.
2347: *
2348: * @return the recovery log associated with this controller.
2349: * @throws VirtualDatabaseException if the database has not recovery log
2350: */
2351: public RecoveryLog getRecoveryLog() throws VirtualDatabaseException {
2352: if (!hasRecoveryLog())
2353: throw new VirtualDatabaseException(Translate
2354: .get("virtualdatabase.no.recovery.log"));
2355:
2356: return getRequestManager().getRecoveryLog();
2357: }
2358:
2359: /**
2360: * Update remote backends list after a backend disable notification has been
2361: * received.
2362: *
2363: * @param disabledBackend backend that is disabled
2364: * @param sender the message sender
2365: */
2366: public void handleRemoteDisableBackendNotification(
2367: DatabaseBackend disabledBackend, Member sender) {
2368: synchronized (backendsPerController) {
2369: List remoteBackends = (List) backendsPerController
2370: .get(sender);
2371: if (remoteBackends == null) { // This case was reported by Alessandro Gamboz on April 1, 2005.
2372: // It looks like the EnableBackend message arrives before membership
2373: // has been properly updated.
2374: logger
2375: .warn("No information has been found for remote controller "
2376: + sender);
2377: remoteBackends = new ArrayList();
2378: backendsPerController.put(sender, remoteBackends);
2379: }
2380: int size = remoteBackends.size();
2381: boolean backendFound = false;
2382: for (int i = 0; i < size; i++) {
2383: DatabaseBackend remoteBackend = (DatabaseBackend) remoteBackends
2384: .get(i);
2385: if (remoteBackend.equals(disabledBackend)) {
2386: logger.info("Backend " + remoteBackend.getName()
2387: + " disabled on controller " + sender);
2388: remoteBackends.set(i, disabledBackend);
2389: backendFound = true;
2390: break;
2391: }
2392: }
2393: if (!backendFound) {
2394: logger
2395: .warn("Updating backend list with unknown backend "
2396: + disabledBackend.getName()
2397: + " disabled on controller " + sender);
2398: remoteBackends.add(disabledBackend);
2399: }
2400: }
2401: }
2402:
2403: /**
2404: * Update remote backends list after a backend disable notification has been
2405: * received.
2406: *
2407: * @param disabledBackendInfos List of BackendInfo objects that are disabled
2408: * @param sender the message sender
2409: */
2410: public void handleRemoteDisableBackendsNotification(
2411: ArrayList disabledBackendInfos, Member sender) {
2412: synchronized (backendsPerController) {
2413: List remoteBackends = (List) backendsPerController
2414: .get(sender);
2415: if (remoteBackends == null) { // This case was reported by Alessandro Gamboz on April 1, 2005.
2416: // It looks like the EnableBackend message arrives before membership
2417: // has been properly updated.
2418: logger
2419: .warn("No information has been found for remote controller "
2420: + sender);
2421: remoteBackends = new ArrayList();
2422: backendsPerController.put(sender, remoteBackends);
2423: }
2424: Iterator iter = disabledBackendInfos.iterator();
2425: while (iter.hasNext()) {
2426: BackendInfo backendInfo = (BackendInfo) iter.next();
2427: DatabaseBackend backend = backendInfo
2428: .getDatabaseBackend();
2429:
2430: if (remoteBackends.contains(backend)) {
2431: logger.info("Backend " + backend.getName()
2432: + " disabled on controller " + sender);
2433: remoteBackends.set(remoteBackends.indexOf(backend),
2434: backend);
2435: } else {
2436: remoteBackends.add(backend);
2437: logger
2438: .warn("Updating backend list with unknown backend "
2439: + backendInfo.getName()
2440: + " disabled on controller "
2441: + sender);
2442: }
2443: }
2444: }
2445: }
2446:
2447: /**
2448: * Sent the local controller configuration to a remote controller
2449: *
2450: * @param dest the membership of the controller so send the information to
2451: * @throws NotConnectedException if the group communication channel is not
2452: * conected
2453: */
2454: public void sendLocalConfiguration(Member dest)
2455: throws NotConnectedException {
2456: // Send controller name to new comer
2457: if (logger.isDebugEnabled())
2458: logger
2459: .debug("Sending local controller name to joining controller ("
2460: + dest + ")");
2461:
2462: List target = new ArrayList();
2463: target.add(dest);
2464: multicastRequestAdapter.multicastMessage(target,
2465: new ControllerInformation(controller
2466: .getControllerName(), controller.getJmxName(),
2467: distributedRequestManager.getControllerId()),
2468: MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2469: .getControllerNameTimeout());
2470:
2471: // Send backend status
2472: if (logger.isDebugEnabled()) {
2473: logger
2474: .debug("Sending backend status name to joining controller ("
2475: + dest + ")");
2476: }
2477: List backendInfos = DatabaseBackend.toBackendInfos(backends);
2478: multicastRequestAdapter.multicastMessage(target,
2479: new BackendStatus(backendInfos,
2480: distributedRequestManager.getControllerId()),
2481: MulticastRequestAdapter.WAIT_ALL, messageTimeouts
2482: .getBackendStatusTimeout());
2483: }
2484:
2485: /**
2486: * Returns true if the corresponding controller is alive
2487: *
2488: * @param controllerId Id of the controller to check
2489: * @return true if controller is in controllerIds list
2490: */
2491: public boolean isAliveController(Long controllerId) {
2492: return controllerIds.containsValue(controllerId);
2493: }
2494:
2495: /**
2496: * Returns true if the given member is ourselves.
2497: *
2498: * @param sender the sender
2499: * @return true if we are the sender, false otherwise
2500: */
2501: public boolean isLocalSender(Member sender) {
2502: return channel.getLocalMembership().equals(sender);
2503: }
2504:
2505: /**
2506: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#removeBackend(java.lang.String)
2507: */
2508: public void removeBackend(String backend)
2509: throws VirtualDatabaseException {
2510: super .removeBackend(backend);
2511:
2512: try {
2513: // Send a group message to update backend list
2514: broadcastBackendInformation(getAllMemberButUs());
2515: } catch (Exception e) {
2516: String msg = "An error occured while multicasting new backedn information";
2517: logger.error(msg, e);
2518: throw new VirtualDatabaseException(msg, e);
2519: }
2520: }
2521:
2522: /**
2523: * Remove a remote controller (usually because it has failed) from the
2524: * controllerMap list and refresh the group membership. This also start a
2525: * ControllerFailureCleanupThread.
2526: *
2527: * @param remoteControllerMembership the membership identifying the remote
2528: * controller
2529: * @return the JMX name of the removed controller (or null if this controller
2530: * was not in the list)
2531: */
2532: private String removeRemoteControllerAndStartCleanupThread(
2533: Member remoteControllerMembership) {
2534: String remoteControllerJmxName = (String) controllerJmxAddress
2535: .remove(remoteControllerMembership);
2536: if (logger.isDebugEnabled())
2537: logger.debug("Removing controller "
2538: + remoteControllerJmxName);
2539:
2540: // Remove the list of remote backends since they are no more reachable
2541: backendsPerController.remove(remoteControllerMembership);
2542: refreshGroupMembership();
2543:
2544: // Resume ongoing activity suspensions originated from the failed controller
2545: resumeOngoingActivitySuspensions(remoteControllerMembership);
2546:
2547: // Retrieve id of controller that failed and start a cleanup thread to
2548: // eliminate any remaining transactions/remaining persistent connections if
2549: // no client failover occurs in the defined timeframe
2550: Long failedControllerId = (Long) controllerIds
2551: .remove(remoteControllerMembership);
2552:
2553: if (failedControllerId != null) {
2554: ControllerFailureCleanupThread controllerFailureCleanupThread = new ControllerFailureCleanupThread(
2555: this , failedControllerId.longValue(),
2556: failoverTimeoutInMs, cleanupThreads,
2557: groupCommunicationMessagesLocallyFlushed);
2558: cleanupThreads.put(failedControllerId,
2559: controllerFailureCleanupThread);
2560: controllerFailureCleanupThread.start();
2561: }
2562:
2563: return remoteControllerJmxName;
2564: }
2565:
2566: /**
2567: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#removeVirtualDatabaseUser(org.continuent.sequoia.common.users.VirtualDatabaseUser)
2568: */
2569: private void removeVirtualDatabaseUser(VirtualDatabaseUser vdbUser) {
2570: try {
2571: multicastRequestAdapter.multicastMessage(getAllMembers(),
2572: new RemoveVirtualDatabaseUser(vdbUser),
2573: MulticastRequestAdapter.WAIT_NONE, messageTimeouts
2574: .getVirtualDatabaseConfigurationTimeout());
2575: } catch (NotConnectedException e) {
2576: logger.error("Channel unavailable while removing vdb user "
2577: + vdbUser.getLogin(), e);
2578: return;
2579: }
2580: }
2581:
2582: /**
2583: * Set a cluster-wide checkpoint relying on the implementation of
2584: * SetCheckpoint to atomically set the checkpoint on all controllers. This
2585: * method leaves writes, transactions, and persistent connections suspended.
2586: * The caller must call RequestManager.resumeActivity() when the processing
2587: * associated with this checkpoint is complete.
2588: *
2589: * @param checkpointName the name of the (transfer) checkpoint to create
2590: * @param groupMembers an ArrayList of target Members
2591: * @throws VirtualDatabaseException in case of scheduler or recoveryLog
2592: * exceptions
2593: * @see SetCheckpointAndResumeTransactions
2594: */
2595: public void setGroupCheckpoint(String checkpointName,
2596: ArrayList groupMembers) throws VirtualDatabaseException {
2597: try {
2598: // First suspend transactions
2599: distributedRequestManager.suspendActivity();
2600: getMulticastRequestAdapter().multicastMessage(
2601: groupMembers,
2602: new DisableBackendsAndSetCheckpoint(
2603: new ArrayList(), checkpointName),
2604: MulticastRequestAdapter.WAIT_ALL,
2605: messageTimeouts.getSetCheckpointTimeout());
2606: } catch (Exception e) {
2607: String msg = "Set group checkpoint failed: checkpointName="
2608: + checkpointName;
2609: logger.error(msg, e);
2610: throw new VirtualDatabaseException(msg);
2611: }
2612: }
2613:
2614: /**
2615: * Sets an atomic (group-wide) checkpoint on local & target controllers
2616: * (referenced by Name).
2617: *
2618: * @param controllerName the target remote controller
2619: * @param suspendActivity Indicates whether or not system activity should
2620: * remain suspended when this request completes.
2621: * @return the 'now' checkpoint name.
2622: * @throws VirtualDatabaseException in case of error (whatever error, wraps
2623: * the underlying error)
2624: */
2625: private String setLogReplicationCheckpoint(String controllerName)
2626: throws VirtualDatabaseException {
2627: return setLogReplicationCheckpoint(getControllerByName(controllerName));
2628: }
2629:
2630: /**
2631: * Sets an atomic (group-wide) checkpoint on local & target controllers
2632: * (refrenced by Name). When this call completes sucessfully, all activity on
2633: * the system is suspend. RequestManager.resumeActivity() must be called to
2634: * resume processing.
2635: *
2636: * @param controller the target remote controller
2637: * @return the 'now' checkpoint name.
2638: * @throws VirtualDatabaseException in case of error (whatever error, wraps
2639: * the underlying error)
2640: */
2641: public String setLogReplicationCheckpoint(Member controller)
2642: throws VirtualDatabaseException {
2643: String checkpointName = buildCheckpointName("now");
2644:
2645: // Apply checkpoint to remote controllers
2646: ArrayList dest = new ArrayList();
2647: dest.add(controller);
2648: dest.add(channel.getLocalMembership());
2649: setGroupCheckpoint(checkpointName, dest);
2650: return checkpointName;
2651: }
2652:
2653: /**
2654: * Sets an atomic (group-wide) checkpoint on all controllers, indicating that
2655: * this vdb has shutdown.
2656: */
2657: public void setShutdownCheckpoint() {
2658: // Set a cluster-wide checkpoint
2659: try {
2660: setGroupCheckpoint(buildCheckpointName("shutdown"),
2661: getAllMembers());
2662: } catch (VirtualDatabaseException e) {
2663: logger.warn("Error while setting shutdown checkpoint", e);
2664: } finally {
2665: if (isShuttingDown())
2666: setRejectingNewTransaction(true);
2667: distributedRequestManager.resumeActivity();
2668: }
2669: }
2670:
2671: /**
2672: * Send a Message to a remote controller, referenced by name. This sends a
2673: * point-to-point message, fifo. No total order is specifically required.
2674: *
2675: * @param controllerName name of the remote controller
2676: * @param message the message to send (should be Serializable)
2677: * @param timeout message timeout in ms
2678: * @throws VirtualDatabaseException (wrapping error) in case of communication
2679: * failure
2680: */
2681: private void sendMessageToController(String controllerName,
2682: Serializable message, long timeout)
2683: throws VirtualDatabaseException {
2684: sendMessageToController(getControllerByName(controllerName),
2685: message, timeout);
2686: }
2687:
2688: /**
2689: * Send a Message to a remote controller, referenced by its Member. This sends
2690: * a point-to-point message, fifo. No total order is specifically required
2691: * (but enforced anyway).
2692: *
2693: * @param controllerMember Member object refering to the remote controller
2694: * @param message the message to send (should be Serializable)
2695: * @param timeout message timeout in ms
2696: * @return the result returned by the remote controller (except if this an
2697: * exception in which case it is automatically thrown)
2698: * @throws VirtualDatabaseException (wrapping error) in case of communication
2699: * failure
2700: */
2701: public Serializable sendMessageToController(
2702: Member controllerMember, Serializable message, long timeout)
2703: throws VirtualDatabaseException {
2704: try {
2705: ArrayList dest = new ArrayList();
2706: dest.add(controllerMember);
2707: MulticastResponse resp = getMulticastRequestAdapter()
2708: .multicastMessage(dest, message,
2709: MulticastRequestAdapter.WAIT_ALL, timeout);
2710: Object o = resp.getResult(controllerMember);
2711: if (o instanceof Exception)
2712: throw (Exception) o;
2713: return (Serializable) o;
2714: } catch (Exception e) {
2715: logger.error(e);
2716: throw new VirtualDatabaseException(e);
2717: }
2718: }
2719:
2720: /**
2721: * Send a Message to a set of controllers. This sends a multicast message,
2722: * fifo and checks for returned exceptions. No total order is specifically
2723: * required (but enforced anyway). If an exception is returned, it is
2724: * rethrown.
2725: *
2726: * @param members ArrayList of Member object refering to controllers
2727: * @param message the message to send (should be Serializable)
2728: * @param timeout message timeout in ms
2729: * @return the result returned (except if one controller returned an exception
2730: * in which case it is automatically thrown)
2731: * @throws VirtualDatabaseException (wrapping error) in case of communication
2732: * failure
2733: */
2734: public MulticastResponse sendMessageToControllers(
2735: ArrayList members, Serializable message, long timeout)
2736: throws VirtualDatabaseException {
2737: try {
2738: MulticastResponse resp = getMulticastRequestAdapter()
2739: .multicastMessage(members, message,
2740: MulticastRequestAdapter.WAIT_ALL, timeout);
2741: Iterator it = resp.getResults().keySet().iterator();
2742: while (it.hasNext()) {
2743: Object o = resp.getResults().get(it.next());
2744: // TODO: return compound exception instead of the first one
2745: if (o instanceof Exception)
2746: throw (Exception) o;
2747: }
2748: return resp;
2749: } catch (Exception e) {
2750: logger.error(e);
2751: throw new VirtualDatabaseException(e);
2752: }
2753: }
2754:
2755: /**
2756: * @see org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase#shutdown(int)
2757: */
2758: public void shutdown(int level) {
2759: ActivityService.getInstance().reset(name);
2760: if (partitionReconciler != null) {
2761: partitionReconciler.dispose();
2762: }
2763:
2764: // Shutdown cleanup threads
2765: for (Iterator iter = cleanupThreads.values().iterator(); iter
2766: .hasNext();) {
2767: ControllerFailureCleanupThread thread = (ControllerFailureCleanupThread) iter
2768: .next();
2769: thread.shutdown();
2770: }
2771:
2772: // Shutdown request result failover cache clean-up thread
2773: requestResultFailoverCache.shutdown();
2774:
2775: super .shutdown(level);
2776: }
2777:
2778: /**
2779: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#transferBackend(java.lang.String,
2780: * java.lang.String)
2781: */
2782: public void transferBackend(String backend,
2783: String controllerDestination)
2784: throws VirtualDatabaseException {
2785: TransferBackendOperation transferOperation = new TransferBackendOperation(
2786: backend, controllerDestination);
2787: addAdminOperation(transferOperation);
2788: try {
2789: Member targetMember = getControllerByName(controllerDestination);
2790:
2791: // Get reference on backend
2792: DatabaseBackend db = getAndCheckBackend(backend,
2793: CHECK_BACKEND_DISABLE);
2794: String transfertCheckpointName = buildCheckpointName("transfer backend: "
2795: + db.getName()
2796: + " from "
2797: + controller.getControllerName()
2798: + " to "
2799: + targetMember.getUid());
2800:
2801: if (logger.isDebugEnabled())
2802: logger.debug("**** Disabling backend for transfer");
2803:
2804: // Disable local backend
2805: try {
2806: if (!hasRecoveryLog())
2807: throw new VirtualDatabaseException(
2808: "Transfer is not supported on virtual databases without a recovery log");
2809:
2810: distributedRequestManager.disableBackendWithCheckpoint(
2811: db, transfertCheckpointName);
2812: } catch (SQLException e) {
2813: throw new VirtualDatabaseException(e.getMessage());
2814: }
2815:
2816: // Enable remote transfered backend.
2817: try {
2818: if (logger.isDebugEnabled())
2819: logger.debug("**** Sending transfer message to:"
2820: + targetMember);
2821:
2822: ArrayList dest = new ArrayList(1);
2823: dest.add(targetMember);
2824:
2825: sendMessageToController(targetMember,
2826: new BackendTransfer(controllerDestination,
2827: transfertCheckpointName,
2828: new BackendInfo(db)), messageTimeouts
2829: .getBackendTransferTimeout());
2830:
2831: if (logger.isDebugEnabled())
2832: logger.debug("**** Removing local backend");
2833:
2834: // Remove backend from this controller
2835: removeBackend(db);
2836:
2837: // Broadcast updated backend list
2838: broadcastBackendInformation(getAllMemberButUs());
2839: } catch (Exception e) {
2840: String msg = "An error occured while transfering the backend";
2841: logger.error(msg, e);
2842: throw new VirtualDatabaseException(msg, e);
2843: }
2844: } finally {
2845: removeAdminOperation(transferOperation);
2846: }
2847: }
2848:
2849: /**
2850: * @see org.continuent.sequoia.common.jmx.mbeans.VirtualDatabaseMBean#transferDump(java.lang.String,
2851: * java.lang.String, boolean)
2852: */
2853: public void transferDump(String dumpName,
2854: String remoteControllerName, boolean noCopy)
2855: throws VirtualDatabaseException {
2856: TransferDumpOperation transferOperation = new TransferDumpOperation(
2857: dumpName, remoteControllerName);
2858: addAdminOperation(transferOperation);
2859: try {
2860: // get the info from the backuper
2861: DumpInfo dumpInfo = null;
2862: try {
2863: dumpInfo = getRecoveryLog().getDumpInfo(dumpName);
2864: /*
2865: * getDumpInfo() is the one that throws SQLException (should it be a
2866: * VirtualDatabaseException instead ???)
2867: */
2868: } catch (SQLException e) {
2869: String msg = "getting dump info from backup manager failed";
2870: throw new VirtualDatabaseException(msg, e);
2871: }
2872:
2873: if (dumpInfo == null)
2874: throw new VirtualDatabaseException(
2875: "no dump info for dump '" + dumpName + "'");
2876:
2877: if (remoteControllerName.equals(controller.getJmxName()))
2878: throw new VirtualDatabaseException(
2879: "Not transfering dump to myself");
2880:
2881: // if a copy is needed, hand-off copy to backuper: setup server side of
2882: // the
2883: // copy
2884: DumpTransferInfo dumpTransferInfo = null;
2885: if (!noCopy) {
2886: try {
2887: dumpTransferInfo = getRequestManager()
2888: .getBackupManager().getBackuperByFormat(
2889: dumpInfo.getDumpFormat())
2890: .setupDumpServer();
2891: } catch (IOException e) {
2892: throw new VirtualDatabaseException(e);
2893: }
2894: }
2895:
2896: // send message to remote vdb instance, to act as a client
2897: // (see handleInitiateDumpCopy)
2898: sendMessageToController(remoteControllerName,
2899: new InitiateDumpCopy(dumpInfo, dumpTransferInfo),
2900: messageTimeouts.getInitiateDumpCopyTimeout());
2901: } finally {
2902: removeAdminOperation(transferOperation);
2903: }
2904: }
2905:
2906: /**
2907: * If we are executing in a distributed virtual database, we have to make sure
2908: * that we post the query in the queue following the total order. This method
2909: * does not remove the request from the total order queue. You have to call
2910: * removeHeadFromAndNotifyTotalOrderQueue() to do so.
2911: *
2912: * @param request the request to wait for (can be any object but usually a
2913: * DistributedRequest, Commit or Rollback)
2914: * @param errorIfNotFound true if an error message should be logged if the
2915: * request is not found in the total order queue
2916: * @return true if the element was found and wait has succeeded, false
2917: * otherwise
2918: */
2919: public boolean waitForTotalOrder(Object request,
2920: boolean errorIfNotFound) {
2921: synchronized (totalOrderQueue) {
2922: int index = totalOrderQueue.indexOf(request);
2923: while (index > 0) {
2924: if (logger.isDebugEnabled())
2925: logger.debug("Waiting for " + index
2926: + " queries to execute (current is "
2927: + totalOrderQueue.get(0) + ")");
2928: try {
2929: totalOrderQueue.wait();
2930: } catch (InterruptedException ignore) {
2931: }
2932: index = totalOrderQueue.indexOf(request);
2933: }
2934: if (index == -1) {
2935: if (errorIfNotFound)
2936: logger
2937: .error("Request was not found in total order queue, posting out of order ("
2938: + request + ")");
2939: return false;
2940: } else
2941: return true;
2942: }
2943: }
2944:
2945: /**
2946: * Variant of the waitForTotalOrder() method, please refer to its
2947: * documentation. The difference here is that we only wait for request of type
2948: * SuspendWritesMessage which are in front of the given request. All other
2949: * request are by-passed.
2950: *
2951: * @see ResumeActivity
2952: * @see BlockActivity
2953: * @see SuspendActivity
2954: * @param request the request to wait for (expected to be a
2955: * ResumeActivityMessage)
2956: * @param errorIfNotFound true if an error message should be logged if the
2957: * request is not found in the total order queue
2958: * @return true if the element was found and wait has succeeded, false
2959: * otherwise
2960: */
2961: public boolean waitForBlockAndSuspendInTotalOrder(Object request,
2962: boolean errorIfNotFound) {
2963: synchronized (totalOrderQueue) {
2964: // Init
2965: int index = totalOrderQueue.indexOf(request);
2966: boolean shouldWait = false;
2967: for (int i = 0; i < index; i++)
2968: if ((totalOrderQueue.get(i) instanceof SuspendWritesMessage))
2969: shouldWait = true;
2970:
2971: // Main loop
2972: while ((index > 0) && shouldWait) {
2973: System.out.println("index=" + index + " shouldWait="
2974: + shouldWait);
2975: if (logger.isDebugEnabled())
2976: logger.debug("Waiting for " + index
2977: + " queries to execute (current is "
2978: + totalOrderQueue.get(0) + ")");
2979: try {
2980: totalOrderQueue.wait();
2981: } catch (InterruptedException ignore) {
2982: }
2983: index = totalOrderQueue.indexOf(request);
2984: shouldWait = false;
2985: for (int i = 0; i < index; i++)
2986: if ((totalOrderQueue.get(i) instanceof SuspendWritesMessage))
2987: shouldWait = true;
2988: }
2989:
2990: if (index == -1) {
2991: if (errorIfNotFound)
2992: logger
2993: .error("Request was not found in total order queue, posting out of order ("
2994: + request + ")");
2995: return false;
2996: } else
2997: return true;
2998: }
2999: }
3000:
3001: /**
3002: * Adds an ongoing activity suspension marker to the current list only if
3003: * suspension was not triggered by the local member.
3004: *
3005: * @param controllerMember member which triggered the activity suspension
3006: */
3007: public void addOngoingActivitySuspension(Member controllerMember) {
3008: if (controllerMember.equals(multicastRequestAdapter
3009: .getChannel().getLocalMembership()))
3010: return;
3011: ongoingActivitySuspensions.add(controllerMember);
3012: }
3013:
3014: /**
3015: * Removes an ongoing activity suspension marker from the current list only if
3016: * resuming was not triggered by the local member.
3017: *
3018: * @param controllerMember member which triggered the activity resuming
3019: */
3020: public void removeOngoingActivitySuspension(Member controllerMember) {
3021: if (controllerMember.equals(multicastRequestAdapter
3022: .getChannel().getLocalMembership()))
3023: return;
3024: ongoingActivitySuspensions.remove(controllerMember);
3025: }
3026:
3027: // Resume ongoing activity suspensions originated from the failed controller
3028: private void resumeOngoingActivitySuspensions(
3029: Member controllerMember) {
3030: List dest = new ArrayList();
3031: dest.add(multicastRequestAdapter.getChannel()
3032: .getLocalMembership());
3033:
3034: for (Iterator iter = ongoingActivitySuspensions.iterator(); iter
3035: .hasNext();) {
3036: Member m = (Member) iter.next();
3037: if (m.equals(controllerMember))
3038: try {
3039: multicastRequestAdapter.multicastMessage(dest,
3040: new ResumeActivity(),
3041: MulticastRequestAdapter.WAIT_ALL,
3042: messageTimeouts.getDisableBackendTimeout());
3043: // Need to remove the marker here because the sender of the
3044: // ResumeActivity message will be the local member
3045: iter.remove();
3046: } catch (NotConnectedException e) {
3047: if (logger.isWarnEnabled())
3048: logger.warn(
3049: "Problem when trying to resume ongoing activity suspensions triggered by "
3050: + controllerMember, e);
3051: }
3052: }
3053: }
3054: }
|