001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2005 Emic Networks.
004: * Contact: sequoia@continuent.org
005: *
006: *
007: * Licensed under the Apache License, Version 2.0 (the "License");
008: * you may not use this file except in compliance with the License.
009: * You may obtain a copy of the License at
010: *
011: * http://www.apache.org/licenses/LICENSE-2.0
012: *
013: * Unless required by applicable law or agreed to in writing, software
014: * distributed under the License is distributed on an "AS IS" BASIS,
015: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: * See the License for the specific language governing permissions and
017: * limitations under the License.
018: *
019: * Initial developer(s): Olivier Fambon.
020: * Contributor(s): ______________________.
021: */package org.continuent.sequoia.controller.virtualdatabase.protocol;
022:
023: import java.io.IOException;
024: import java.io.Serializable;
025: import java.sql.SQLException;
026: import java.util.ArrayList;
027:
028: import org.continuent.hedera.adapters.MulticastRequestAdapter;
029: import org.continuent.hedera.adapters.MulticastResponse;
030: import org.continuent.hedera.common.Member;
031: import org.continuent.sequoia.common.exceptions.VirtualDatabaseException;
032: import org.continuent.sequoia.common.i18n.Translate;
033: import org.continuent.sequoia.common.log.Trace;
034: import org.continuent.sequoia.controller.recoverylog.RecoveryLog;
035: import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
036: import org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase;
037:
038: /**
039: * This message is used attempt an automatic recovery log resynchronization when
040: * a vdb is loaded on a controller that is not the first in it's group. Upon
041: * reception of this message, the 'mother' vdb checks to see if the specified
042: * checkpoint exists. If not, the call fails, and the sender should try another
043: * chekpoint name. If the specified checkpoint exists, a 'now' checkpoint is set
044: * (cluster-wide) and the call returns with this object containing the name of
045: * this checkpoint and the size of the chunck or LogEntries that it will send.
046: *
047: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.ReplicateLogEntries
048: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.CopyLogEntry
049: * @author <a href="mailto:Olivier.Fambon@continuent.com>Olivier Fambon </a>
050: * @version 1.0
051: */
052: public class ResyncRecoveryLog extends
053: DistributedVirtualDatabaseMessage {
054: private static final long serialVersionUID = 3246850782028970719L;
055:
056: private String checkpointName;
057:
058: /**
059: * Creates a new <code>ResyncRecoveryLog</code> message
060: *
061: * @param checkpointName The checkpoint from which to resync the log.
062: */
063: public ResyncRecoveryLog(String checkpointName) {
064: this .checkpointName = checkpointName;
065: }
066:
067: /**
068: * Returns the checkpointName value.
069: *
070: * @return Returns the 'now' checkpointName.
071: */
072: public String getCheckpointName() {
073: return checkpointName;
074: }
075:
076: /**
077: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageSingleThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
078: * org.continuent.hedera.common.Member)
079: */
080: public Object handleMessageSingleThreaded(
081: DistributedVirtualDatabase dvdb, Member sender) {
082: return null;
083: }
084:
085: /**
086: * @see org.continuent.sequoia.controller.virtualdatabase.protocol.DistributedVirtualDatabaseMessage#handleMessageMultiThreaded(org.continuent.sequoia.controller.virtualdatabase.DistributedVirtualDatabase,
087: * org.continuent.hedera.common.Member, java.lang.Object)
088: */
089: public Serializable handleMessageMultiThreaded(
090: DistributedVirtualDatabase dvdb, Member sender,
091: Object handleMessageSingleThreadedResult) {
092: if (!dvdb.hasRecoveryLog())
093: return new VirtualDatabaseException(Translate
094: .get("virtualdatabase.no.recovery.log"));
095:
096: RecoveryLog recoveryLog = dvdb.getRequestManager()
097: .getRecoveryLog();
098:
099: long commonCheckpointId;
100: try {
101: commonCheckpointId = recoveryLog
102: .getCheckpointLogId(checkpointName);
103: } catch (SQLException e) {
104: return new VirtualDatabaseException(
105: "Unable to retrieve checkpoint " + checkpointName,
106: e);
107: }
108:
109: /* set a global 'now' checkpoint (temporary)
110: * When this request completes all activity on the system is
111: * suspended and must be resumed by calling RequestManager.resumeActivity()
112: */
113: String nowCheckpointName;
114: try {
115: nowCheckpointName = dvdb
116: .setLogReplicationCheckpoint(sender);
117: } catch (VirtualDatabaseException e) {
118: return e;
119: }
120:
121: // get it's id (ewerk) so that we can replicate it on the other side
122: long nowCheckpointId;
123: Trace logger = dvdb.getLogger();
124: try {
125: nowCheckpointId = recoveryLog
126: .getCheckpointLogId(nowCheckpointName);
127: } catch (SQLException e) {
128: dvdb.getRequestManager().resumeActivity();
129: String errorMessage = "Cannot find 'now checkpoint' log entry";
130: logger.error(errorMessage);
131: return new VirtualDatabaseException(errorMessage);
132: }
133:
134: // Compute the number of entries to be replayed and send it to the remote
135: // controller so that it can allocate the proper number of entries in its
136: // recovery log
137: long nbOfEntriesToResync = nowCheckpointId - commonCheckpointId;
138: long diff;
139: try {
140: Serializable replyValue = dvdb.sendMessageToController(
141: sender, new InitiateRecoveryLogResync(
142: checkpointName, commonCheckpointId,
143: nowCheckpointName, nbOfEntriesToResync),
144: dvdb.getMessageTimeouts()
145: .getReplicateLogEntriesTimeout());
146: if (replyValue instanceof Long)
147: diff = ((Long) replyValue).longValue();
148: else
149: throw new RuntimeException(
150: "Invalid answer from remote controller on InitiateRecoveryLogResync ("
151: + replyValue + ")");
152: } catch (Exception e) {
153: String errorMessage = "Failed to initialize recovery log resynchronization";
154: logger.error(errorMessage, e);
155: return new VirtualDatabaseException(errorMessage, e);
156: } finally {
157: dvdb.getRequestManager().resumeActivity();
158: }
159:
160: logger.info("Resynchronizing from checkpoint " + checkpointName
161: + " (" + commonCheckpointId + ") to checkpoint "
162: + nowCheckpointName + " (" + nowCheckpointId + ")");
163:
164: // protect from concurrent log updates: fake a recovery (increments
165: // semaphore)
166: recoveryLog.beginRecovery();
167:
168: // copy the entries over to the remote controller.
169: // Send them one by one over to the remote controller, coz each LogEntry can
170: // potentially be huge (e.g. if it contains a blob)
171: try {
172: ArrayList dest = new ArrayList();
173: dest.add(sender);
174: long copyLogEntryTimeout = dvdb.getMessageTimeouts()
175: .getCopyLogEntryTimeout();
176: for (long id = commonCheckpointId; id < nowCheckpointId; id++) {
177: LogEntry entry = recoveryLog.getNextLogEntry(id);
178: if (entry == null) {
179: String errorMessage = "Cannot find expected log entry: "
180: + id;
181: logger.error(errorMessage);
182: return new VirtualDatabaseException(errorMessage);
183: }
184:
185: // Because 'getNextLogEntry()' will hunt for the next valid log entry,
186: // we need to update the iterator with the new id value - 1
187: id = entry.getLogId() - 1;
188: entry.setLogId(entry.getLogId() + diff);
189:
190: MulticastResponse resp = dvdb
191: .getMulticastRequestAdapter().multicastMessage(
192: dest, new CopyLogEntry(entry),
193: MulticastRequestAdapter.WAIT_NONE,
194: copyLogEntryTimeout);
195: if (resp.getFailedMembers() != null)
196: throw new IOException(
197: "Failed to deliver log entry " + id
198: + " to remote controller " + sender);
199: }
200: } catch (Exception e) {
201: String errorMessage = "Failed to complete recovery log resynchronization";
202: logger.error(errorMessage, e);
203: return new VirtualDatabaseException(errorMessage, e);
204: } finally {
205: recoveryLog.endRecovery(); // release semaphore
206: }
207:
208: // Now check that no entry was missed by the other controller since we
209: // shipped all entries asynchronously without getting any individual ack
210: // (much faster to address SEQUOIA-504)
211: try {
212: long localNbOfLogEntries = recoveryLog
213: .getNumberOfLogEntries(commonCheckpointId,
214: nowCheckpointId);
215:
216: if (logger.isDebugEnabled())
217: logger
218: .debug("Checking that "
219: + localNbOfLogEntries
220: + " entries were replicated on remote controller");
221:
222: Serializable replyValue = dvdb.sendMessageToController(
223: sender, new CompleteRecoveryLogResync(
224: checkpointName, nowCheckpointName,
225: localNbOfLogEntries), dvdb
226: .getMessageTimeouts()
227: .getReplicateLogEntriesTimeout());
228: if (replyValue instanceof Long) {
229: diff = ((Long) replyValue).longValue();
230: if (diff != 0)
231: return new VirtualDatabaseException(
232: "Recovery log resynchronization reports a difference of "
233: + diff + " entries");
234: } else
235: throw new RuntimeException(
236: "Invalid answer from remote controller on CompleteRecoveryLogResync ("
237: + replyValue + ")");
238: } catch (Exception e) {
239: String errorMessage = "Failed to initialize recovery log resynchronization";
240: logger.error(errorMessage, e);
241: return new VirtualDatabaseException(errorMessage, e);
242: }
243:
244: return null;
245: }
246:
247: }
|