001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2006 Continuent, Inc.
004: * Contact: sequoia@continuent.org
005: *
006: * Licensed under the Apache License, Version 2.0 (the "License");
007: * you may not use this file except in compliance with the License.
008: * You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing, software
013: * distributed under the License is distributed on an "AS IS" BASIS,
014: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015: * See the License for the specific language governing permissions and
016: * limitations under the License.
017: *
018: * Initial developer(s): Emmanuel Cecchet.
019: * Contributor(s): ______________________.
020: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
021:
022: import java.io.Serializable;
023:
024: import org.continuent.hedera.common.Member;
025: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
026: import org.continuent.sequoia.common.i18n.Translate;
027: import org.continuent.sequoia.common.log.Trace;
028: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
029: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
030:
031: /**
032: * This class defines a InitiateRecoveryLogResync message used to send
033: * parameters needed to initiate an automated recovery log synchronization. The
034: * message sends back the delta to apply to entries id to be inserted in the
035: * local recovery log.
036: *
037: * @author <a href="mailto:emmanuel.cecchet@continuent.com">Emmanuel Cecchet</a>
038: * @version 1.0
039: */
040: public class InitiateRecoveryLogResync extends
041: DistributedVirtualDatabaseMessage {
042: private static final long serialVersionUID = -6799114439172152L;
043:
044: private long originCommonCheckpointId;
045: private String commonCheckpointName;
046: private String nowCheckpointName;
047: private long nbOfEntriesToResync;
048:
049: /**
050: * Creates a new <code>InitiateRecoveryLogResync</code> object
051: *
052: * @param commonCheckpointName common checkpoint name where resync will start
053: * @param commonCheckpointId common checkpoint id on the original node (the
054: * one that is up-to-date)
055: * @param nowCheckpointName newly inserted checkpoint where resync will end
056: * @param nbOfEntriesToResync number of entries to be resynchronized between
057: * these 2 checkpoints
058: */
059: public InitiateRecoveryLogResync(String commonCheckpointName,
060: long commonCheckpointId, String nowCheckpointName,
061: long nbOfEntriesToResync) {
062: this .commonCheckpointName = commonCheckpointName;
063: this .originCommonCheckpointId = commonCheckpointId;
064: this .nowCheckpointName = nowCheckpointName;
065: this .nbOfEntriesToResync = nbOfEntriesToResync;
066: }
067:
068: /**
069: * Returns the commonCheckpointId value.
070: *
071: * @return Returns the commonCheckpointId.
072: */
073: public final long getOriginCommonCheckpointId() {
074: return originCommonCheckpointId;
075: }
076:
077: /**
078: * Returns the commonCheckpointName value.
079: *
080: * @return Returns the commonCheckpointName.
081: */
082: public final String getCommonCheckpointName() {
083: return commonCheckpointName;
084: }
085:
086: /**
087: * Returns the nbOfEntriesToResync value.
088: *
089: * @return Returns the nbOfEntriesToResync.
090: */
091: public final long getNbOfEntriesToResync() {
092: return nbOfEntriesToResync;
093: }
094:
095: /**
096: * Returns the nowCheckpointName value.
097: *
098: * @return Returns the nowCheckpointName.
099: */
100: public final String getNowCheckpointName() {
101: return nowCheckpointName;
102: }
103:
104: /**
105: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
106: * org.continuent.hedera.common.Member)
107: */
108: public Object handleMessageSingleThreaded(
109: DistributedVirtualDatabase dvdb, Member sender) {
110: if (!dvdb.hasRecoveryLog())
111: return new VirtualDatabaseException(Translate
112: .get("virtualdatabase.no.recovery.log"));
113:
114: return this ;
115: }
116:
117: /**
118: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
119: * org.continuent.hedera.common.Member, java.lang.Object)
120: */
121: public Serializable handleMessageMultiThreaded(
122: DistributedVirtualDatabase dvdb, Member sender,
123: Object handleMessageSingleThreadedResult) {
124: if (!dvdb.hasRecoveryLog())
125: return new VirtualDatabaseException(Translate
126: .get("virtualdatabase.no.recovery.log"));
127:
128: Trace logger = dvdb.getLogger();
129: try {
130: RecoveryLog recoveryLog = dvdb.getRequestManager()
131: .getRecoveryLog();
132: long commonCheckpointId = recoveryLog
133: .getCheckpointLogId(getCommonCheckpointName());
134: long nowCheckpointId = recoveryLog
135: .getCheckpointLogId(getNowCheckpointName());
136:
137: // Compute how much additional space is needed to resync the log
138: long shift = getNbOfEntriesToResync()
139: - (nowCheckpointId - commonCheckpointId);
140:
141: // Shift entries only if we do not have enough space
142: if (shift > 0)
143: recoveryLog.moveEntries(nowCheckpointId, shift);
144:
145: // Cleanup entries in the log hole that will be resynced
146: recoveryLog.deleteLogEntriesAndCheckpointBetween(
147: commonCheckpointId, nowCheckpointId + shift);
148:
149: // Update now checkpoint
150: recoveryLog.removeCheckpoint(nowCheckpointName);
151: recoveryLog.storeCheckpoint(nowCheckpointName,
152: nowCheckpointId + shift);
153:
154: return new Long(commonCheckpointId
155: - getOriginCommonCheckpointId());
156: } catch (Exception e) {
157: logger
158: .error(
159: "Unable to initialize recovery log resynchronization",
160: e);
161: return new VirtualDatabaseException(
162: "Unable to initialize recovery log resynchronization",
163: e);
164: }
165: }
166:
167: }
|