001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Emmanuel Cecchet.
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
021:
022: import java.io.Serializable;
023: import java.util.LinkedList;
024:
025: import org.continuent.hedera.common.Member;
026: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
027: import org.continuent.sequoia.common.i18n.Translate;
028: import org.continuent.sequoia.common.log.Trace;
029: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
030: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
031:
032: /**
033: * This class defines a CompleteRecoveryLogResync message used to check that the
034: * recovery log resync happened properly.
035: *
036: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet</a>
037: * @version 1.0
038: */
039: public class CompleteRecoveryLogResync extends
040: DistributedVirtualDatabaseMessage {
041: private static final long serialVersionUID = 5811295250708240645L;
042:
043: private long commonCheckpointId;
044: private String commonCheckpointName;
045: private String nowCheckpointName;
046: private long nbOfEntriesToResync;
047:
048: private transient LinkedList totalOrderQueue;
049:
050: /**
051: * Creates a new <code>CompleteRecoveryLogResync</code> object. <br>
052: * This object will be used to check that the restore log operation did not
053: * generate any inconcistency (by checking that the number of copied log
054: * entries from commonCheckpointId to the log id corresponding to
055: * nowCheckpointName is equal to nbOfEntriesToResync)
056: *
057: * @param commonCheckpointId common checkpoint id where resync will start
058: * @param nowCheckpointName newly inserted checkpoint where resync will end
059: * @param nbOfEntriesToResync number of entries to be resynchronized between
060: * these 2 checkpoints
061: */
062: public CompleteRecoveryLogResync(long commonCheckpointId,
063: String nowCheckpointName, long nbOfEntriesToResync) {
064: this .commonCheckpointId = commonCheckpointId;
065: this .nowCheckpointName = nowCheckpointName;
066: this .nbOfEntriesToResync = nbOfEntriesToResync;
067: }
068:
069: /**
070: * Creates a new <code>CompleteRecoveryLogResync</code> object<br>
071: * It will be used there to check that the automatic resync of recovery logs
072: * worked well, by verifying that the number of log entries that has been
073: * transfered is the expected one (nbOfEntriesToResync) between the two log
074: * entries identified by commonCheckpointName and nowCheckpointName.
075: *
076: * @param commonCheckpointName common checkpoint name where resync will start
077: * @param nowCheckpointName newly inserted checkpoint where resync will end
078: * @param nbOfEntriesToResync number of entries to be resynchronized between
079: * these 2 checkpoints
080: */
081: public CompleteRecoveryLogResync(String commonCheckpointName,
082: String nowCheckpointName, long nbOfEntriesToResync) {
083: this .commonCheckpointName = commonCheckpointName;
084: this .nowCheckpointName = nowCheckpointName;
085: this .nbOfEntriesToResync = nbOfEntriesToResync;
086: }
087:
088: /**
089: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
090: * org.continuent.hedera.common.Member)
091: */
092: public Object handleMessageSingleThreaded(
093: DistributedVirtualDatabase dvdb, Member sender) {
094: if (!dvdb.hasRecoveryLog())
095: return new VirtualDatabaseException(Translate
096: .get("virtualdatabase.no.recovery.log"));
097:
098: totalOrderQueue = dvdb.getTotalOrderQueue();
099: if (totalOrderQueue == null)
100: return new VirtualDatabaseException(Translate.get(
101: "virtualdatabase.no.total.order.queue", dvdb
102: .getVirtualDatabaseName()));
103:
104: synchronized (totalOrderQueue) {
105: totalOrderQueue.addLast(this );
106: return this ;
107: }
108: }
109:
110: /**
111: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
112: * org.continuent.hedera.common.Member, java.lang.Object)
113: */
114: public Serializable handleMessageMultiThreaded(
115: DistributedVirtualDatabase dvdb, Member sender,
116: Object handleMessageSingleThreadedResult) {
117: if (!dvdb.hasRecoveryLog())
118: return new VirtualDatabaseException(Translate
119: .get("virtualdatabase.no.recovery.log"));
120:
121: Trace logger = dvdb.getLogger();
122: try {
123: // Wait for our turn to execute
124: if (!dvdb.waitForTotalOrder(
125: handleMessageSingleThreadedResult, false))
126: logger
127: .error("CompleteRecoveryLogResync was not found in total order queue, posting out of order ("
128: + commonCheckpointName + ")");
129: else
130: synchronized (totalOrderQueue) {
131: totalOrderQueue.removeFirst();
132: totalOrderQueue.notifyAll();
133: }
134:
135: RecoveryLog recoveryLog = dvdb.getRequestManager()
136: .getRecoveryLog();
137: long nowCheckpointId = recoveryLog
138: .getCheckpointLogId(nowCheckpointName);
139: if (commonCheckpointName != null) {
140: commonCheckpointId = recoveryLog
141: .getCheckpointLogId(commonCheckpointName);
142: }
143: long localNbOfLogEntries = recoveryLog
144: .getNumberOfLogEntries(commonCheckpointId,
145: nowCheckpointId);
146: long diff = localNbOfLogEntries - nbOfEntriesToResync;
147:
148: if (logger.isDebugEnabled()) {
149: logger
150: .debug("Recovery log is being resynchronized between "
151: + commonCheckpointName
152: + "and "
153: + nowCheckpointId
154: + " ("
155: + nowCheckpointName + ")");
156: logger.debug("Recovery log has resynchronized "
157: + localNbOfLogEntries
158: + "entries (diff with remote controller is "
159: + diff + ")");
160: }
161:
162: if (diff != 0)
163: logger
164: .error("Detected inconsistency in restore log operation, logs differ by "
165: + diff
166: + " entries (original was "
167: + nbOfEntriesToResync
168: + ", local is "
169: + localNbOfLogEntries + ")");
170: else
171: logger
172: .info("Recovery log re-synchronized "
173: + nbOfEntriesToResync
174: + " entries successfully");
175:
176: // Send the difference
177: return new Long(diff);
178: } catch (Exception e) {
179: logger
180: .error(
181: "Unable to complete recovery log resynchronization",
182: e);
183: return new VirtualDatabaseException(
184: "Unable to complete recovery log resynchronization",
185: e);
186: }
187: }
188: }
|