001: /**
002: * Sequoia: Database clustering technology.
003: * Copyright (C) 2002-2004 French National Institute For Research In Computer
004: * Science And Control (INRIA).
005: * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006: * Copyright (C) 2005-2006 Continuent, Inc.
007: * Contact: sequoia@continuent.org
008: *
009: * Licensed under the Apache License, Version 2.0 (the "License");
010: * you may not use this file except in compliance with the License.
011: * You may obtain a copy of the License at
012: *
013: * http://www.apache.org/licenses/LICENSE-2.0
014: *
015: * Unless required by applicable law or agreed to in writing, software
016: * distributed under the License is distributed on an "AS IS" BASIS,
017: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018: * See the License for the specific language governing permissions and
019: * limitations under the License.
020: *
021: * Initial developer(s): Nicolas Modrzyk.
022: * Contributor(s): Emmanuel Cecchet.
023: */package org.continuent.sequoia.controller.recoverylog;
024:
025: import java.sql.SQLException;
026: import java.util.ArrayList;
027: import java.util.HashMap;
028: import java.util.Iterator;
029: import java.util.LinkedList;
030: import java.util.List;
031:
032: import javax.management.ObjectName;
033:
034: import org.continuent.sequoia.common.i18n.Translate;
035: import org.continuent.sequoia.common.jmx.JmxConstants;
036: import org.continuent.sequoia.common.jmx.management.BackendState;
037: import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
038: import org.continuent.sequoia.common.log.Trace;
039: import org.continuent.sequoia.controller.backend.DatabaseBackend;
040: import org.continuent.sequoia.controller.jmx.MBeanServerManager;
041: import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
042: import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
043: import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
044: import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
045: import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
046: import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
047: import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
048: import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
049: import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
050: import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
051: import org.continuent.sequoia.controller.requestmanager.RequestManager;
052: import org.continuent.sequoia.controller.requests.AbstractRequest;
053: import org.continuent.sequoia.controller.requests.StoredProcedure;
054: import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
055:
056: /**
057: * This class defines a RecoverThread that is in charge of replaying the
058: * recovery log on a given backend to re-synchronize it with the other nodes of
059: * the cluster.
060: *
061: * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
062: * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
063: * @version 1.0
064: */
065: public class RecoverThread extends Thread {
066: static Trace logger = Trace
067: .getLogger(RecoverThread.class.getName());
068: /** end user logger */
069: static Trace endUserLogger = Trace
070: .getLogger("org.continuent.sequoia.enduser");
071: private RecoveryLog recoveryLog;
072: private DatabaseBackend backend;
073: private RequestManager requestManager;
074:
075: // an exception thrown while recovering
076: private SQLException exception;
077:
078: /**
079: * a List<Long> of persistent connection IDs that are re-opened during
080: * recovery
081: */
082: private List persistentConnections;
083:
084: /**
085: * HashMap of transaction IDs which are replayed during recovery (key is
086: * transaction id, value is login)
087: */
088: private HashMap tids;
089:
090: /**
091: * The scheduler used to suspend writes during the recovery process
092: */
093: private AbstractScheduler scheduler;
094:
095: private String checkpointName;
096:
097: /** Size of the pendingRecoveryTasks queue used during recovery */
098: private int recoveryBatchSize;
099:
100: /**
101: * Creates a new <code>RecoverThread</code> object
102: *
103: * @param scheduler the currently used scheduler
104: * @param recoveryLog Recovery log that creates this thread
105: * @param backend database backend for logging
106: * @param requestManager the request manager to use for recovery
107: * @param checkpointName the checkpoint from which is started the recovery
108: */
109: public RecoverThread(AbstractScheduler scheduler,
110: RecoveryLog recoveryLog, DatabaseBackend backend,
111: RequestManager requestManager, String checkpointName) {
112: super ("RecoverThread for backend " + backend.getName());
113: this .scheduler = scheduler;
114: this .recoveryLog = recoveryLog;
115: this .backend = backend;
116: this .requestManager = requestManager;
117: this .checkpointName = checkpointName;
118: this .recoveryBatchSize = recoveryLog.getRecoveryBatchSize();
119: tids = new HashMap();
120: persistentConnections = new ArrayList();
121: }
122:
123: /**
124: * Returns the exception value.
125: *
126: * @return Returns the exception.
127: */
128: public SQLException getException() {
129: return exception;
130: }
131:
132: /**
133: * @see java.lang.Runnable#run()
134: */
135: public void run() {
136: backend.setState(BackendState.REPLAYING);
137: try {
138: if (!backend.isInitialized())
139: backend.initializeConnections();
140: } catch (SQLException e) {
141: recoveryFailed(e);
142: return;
143: }
144: // notify the recovery log that a new
145: // recovery is about to begin
146: recoveryLog.beginRecovery();
147:
148: // Get the checkpoint from the recovery log
149: long logIdx;
150: try {
151: logIdx = recoveryLog.getCheckpointLogId(checkpointName);
152: } catch (SQLException e) {
153: recoveryLog.endRecovery();
154: String msg = Translate.get(
155: "recovery.cannot.get.checkpoint", e);
156: logger.error(msg, e);
157: recoveryFailed(new SQLException(msg));
158: return;
159: }
160:
161: try {
162: startRecovery();
163:
164: logger.info(Translate.get("recovery.start.process"));
165:
166: // Play write queries from the recovery log until the last entry or the
167: // first executing entry
168: LinkedList pendingRecoveryTasks = new LinkedList();
169: try {
170: logIdx = recover(logIdx, pendingRecoveryTasks);
171: } catch (EndOfRecoveryLogException e) {
172: logIdx = e.getLogIdx();
173: }
174:
175: requestManager.suspendActivity();
176:
177: /*
178: * We need to make sure that the logger thread queue has been flushed to
179: * the database, so we need a synchronization event that will make sure
180: * that this happens. The getCheckpointLogId method posts a new object in
181: * the logger thread queue and waits for it to be processed before
182: * returning the result. We don't care about the result that is even
183: * supposed to throw an exception but at least we are sure that the whole
184: * queue has been flushed to disk.
185: */
186: try {
187: recoveryLog
188: .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ...");
189: } catch (SQLException ignore) {
190: }
191:
192: // Play the remaining writes that were pending and which have been logged
193: boolean replayedAllLog = false;
194: do { // Loop until the whole recovery log has been replayed
195: try {
196: logIdx = recover(logIdx, pendingRecoveryTasks);
197: // The status update for the last request (probably a commit/rollback)
198: // is not be there yet. Wait for it to be flushed to the log and
199: // retry.
200: try {
201: recoveryLog
202: .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ...");
203: } catch (SQLException ignore) {
204: }
205: } catch (EndOfRecoveryLogException e) {
206: replayedAllLog = true;
207: }
208: } while (!replayedAllLog);
209: waitForAllTasksCompletion(pendingRecoveryTasks);
210: } catch (SQLException e) {
211: recoveryFailed(e);
212: // Resume writes, transactions and persistent connections
213: requestManager.resumeActivity();
214: return;
215: } finally {
216: endRecovery();
217: }
218:
219: // Now enable it
220: try {
221: requestManager.getLoadBalancer().enableBackend(backend,
222: true);
223: } catch (SQLException e) {
224: recoveryFailed(e);
225: return;
226: } finally {
227: // Resume writes, transactions and persistent connections
228: requestManager.resumeActivity();
229: }
230: logger.info(Translate.get("backend.state.enabled", backend
231: .getName()));
232: }
233:
234: /**
235: * Unset the last known checkpoint and set the backend to disabled state. This
236: * should be called when the recovery has failed.
237: *
238: * @param e cause of the recovery failure
239: */
240: private void recoveryFailed(SQLException e) {
241: this .exception = e;
242:
243: if (scheduler.isSuspendedWrites())
244: scheduler.resumeWrites();
245:
246: backend.setLastKnownCheckpoint(null);
247: backend.setState(BackendState.DISABLED);
248: try {
249: backend.finalizeConnections();
250: } catch (SQLException ignore) {
251: }
252: backend
253: .notifyJmxError(
254: SequoiaNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED,
255: e);
256: }
257:
258: /**
259: * Replay the recovery log from the given logIdx index. Note that
260: * startRecovery() must have been called to fork and start the
261: * BackendWorkerThread before calling recover. endRecovery() must be called
262: * after recover() to terminate the thread.
263: *
264: * @param logIdx logIdx used to start the recovery
265: * @param pendingRecoveryTasks
266: * @return last logIdx that was replayed.
267: * @throws SQLException if fails
268: * @see #startRecovery()
269: * @see #endRecovery()
270: */
271: private long recover(long logIdx, LinkedList pendingRecoveryTasks)
272: throws SQLException, EndOfRecoveryLogException {
273: RecoveryTask recoveryTask = null;
274: AbstractTask abstractTask = null;
275:
276: Long tid = null;
277: long previousRemaining = 0;
278: // Replay the whole log
279: do {
280: try {
281: recoveryTask = recoveryLog.recoverNextRequest(logIdx,
282: scheduler);
283: } catch (SQLException e) {
284: // Signal end of recovery and kill worker thread
285: recoveryLog.endRecovery();
286: addWorkerTask(new KillThreadTask(1, 1));
287: String msg = Translate.get(
288: "recovery.cannot.recover.from.index", e);
289: logger.error(msg, e);
290: throw new SQLException(msg);
291: }
292: if (recoveryTask == null)
293: throw new EndOfRecoveryLogException(logIdx);
294:
295: abstractTask = recoveryTask.getTask();
296: if (abstractTask == null)
297: throw new SQLException(
298: "Unexpected null abstract task in recovery task "
299: + recoveryTask);
300:
301: if (LogEntry.EXECUTING.equals(recoveryTask.getStatus())) {
302: // Ok, wait for current tasks to complete and notify the recovery that
303: // we stopped on this entry
304: break;
305: }
306:
307: if (!LogEntry.SUCCESS.equals(recoveryTask.getStatus())) { // Ignore failed queries unless they are stored procedures that could
308: // have some side effect
309: if (!(abstractTask.getRequest() instanceof StoredProcedure)) {
310: logIdx++;
311: continue;
312: }
313: }
314: if ((logIdx % 1000) == 0) {
315: long remaining = recoveryLog.getCurrentLogId() - logIdx;
316: endUserLogger.info("Recovering log entry " + logIdx
317: + " remaining entries " + remaining);
318: if (previousRemaining > 0
319: && remaining > previousRemaining) {
320: endUserLogger
321: .warn("Recovery falling behind pending requests ="
322: + pendingRecoveryTasks.size());
323: }
324: previousRemaining = remaining;
325: }
326: if (abstractTask.isPersistentConnection()) {
327: long cid = abstractTask.getPersistentConnectionId();
328: if (abstractTask instanceof OpenPersistentConnectionTask)
329: persistentConnections.add(new Long(cid));
330: else if (abstractTask instanceof ClosePersistentConnectionTask)
331: persistentConnections.remove(new Long(cid));
332: else if (!persistentConnections.contains(new Long(cid))) {
333: /**
334: * If the task persistent connection id does not have a corresponding
335: * connection opening (it is not in the persistent connections list),
336: * then this task has already been played when the backend was
337: * disabled. So we can skip it.
338: * <p>
339: * Note that if the task is a BeginTask, skipping the begin will skip
340: * all requests in the transaction which is the expected behavior on a
341: * persistent connection (transaction has been played before the
342: * connection was closed, i.e. the backend was disabled).
343: */
344: logIdx++;
345: continue;
346: }
347: }
348:
349: // Used to retrieve login and persistent connection id
350: AbstractRequest request = null;
351: if (!abstractTask.isAutoCommit()) {
352: tid = new Long(recoveryTask.getTid());
353: if (abstractTask instanceof BeginTask) {
354: if (tids.containsKey(tid)) {
355: // Skip multiple begins of the same transaction if exists (this is
356: // possible !!!)
357: logIdx++;
358: continue;
359: }
360: tids.put(tid, abstractTask.getRequest());
361: } else {
362: request = (AbstractRequest) tids.get(tid);
363: if (request == null) {
364: /*
365: * if the task transaction id does not have a corresponding begin
366: * (it is not in the tids list), then this task has already been
367: * played when the backend was disabled. So we can skip it.
368: */
369: logIdx++;
370: continue;
371: }
372: if (abstractTask instanceof RollbackTask) {
373: // Override login in case it was logged with UNKNOWN_USER
374: ((RollbackTask) abstractTask)
375: .getTransactionMetaData().setLogin(
376: request.getLogin());
377: }
378: // Restore persistent connection id information
379: abstractTask.setPersistentConnection(request
380: .isPersistentConnection());
381: abstractTask.setPersistentConnectionId(request
382: .getPersistentConnectionId());
383: }
384: } // else autocommit ok
385:
386: if ((abstractTask instanceof CommitTask)
387: || (abstractTask instanceof RollbackTask)) {
388: tids.remove(tid);
389: }
390:
391: logIdx = recoveryTask.getId();
392: // Add the task for execution by the BackendWorkerThread
393: addWorkerTask(abstractTask);
394:
395: // Add it to the list of currently executing tasks
396: pendingRecoveryTasks.addLast(recoveryTask);
397:
398: do {
399: // Now let's check which tasks have completed and remove them from the
400: // pending queue.
401: for (Iterator iter = pendingRecoveryTasks.iterator(); iter
402: .hasNext();) {
403: recoveryTask = (RecoveryTask) iter.next();
404: abstractTask = recoveryTask.getTask();
405: if (abstractTask.hasFullyCompleted()) { // Task has completed, remove it from the list
406: iter.remove();
407:
408: if (LogEntry.SUCCESS.equals(recoveryTask
409: .getStatus())) { // Only deal with successful tasks
410:
411: if (abstractTask.getFailed() > 0) { // We fail to recover that task. Signal end of recovery and kill
412: // worker thread
413: String msg;
414: if (abstractTask.isAutoCommit())
415: msg = Translate
416: .get(
417: "recovery.failed.with.error",
418: new Object[] {
419: abstractTask,
420: ((Exception) abstractTask
421: .getExceptions()
422: .get(
423: 0))
424: .getMessage() });
425: else
426: msg = Translate
427: .get(
428: "recovery.failed.with.error.transaction",
429: new Object[] {
430: Long
431: .toString(abstractTask
432: .getTransactionId()),
433: abstractTask,
434: ((Exception) abstractTask
435: .getExceptions()
436: .get(
437: 0))
438: .getMessage() });
439: recoveryLog.endRecovery();
440: addWorkerTask(new KillThreadTask(1, 1));
441: pendingRecoveryTasks.clear();
442: logger.error(msg);
443: throw new SQLException(msg);
444: }
445: }
446: }
447: }
448:
449: /*
450: * Transactions and persistentConnections limit by the
451: * number of pending requests at the backend total order
452: * queue. Only one request per transaction or persistent
453: * connection will be move from the total order queue
454: * to the task queues. When all the requests are auto commit,
455: * we need to limit the number of requests here. Otherwise,
456: * the conflicting queue can grow indefinitely large.
457: */
458: if (tids.isEmpty()
459: && persistentConnections.isEmpty()
460: && pendingRecoveryTasks.size() >= recoveryBatchSize)
461: try {
462: recoveryTask = (RecoveryTask) pendingRecoveryTasks
463: .getFirst();
464: abstractTask = recoveryTask.getTask();
465: synchronized (abstractTask) {
466: if (!abstractTask.hasFullyCompleted())
467: abstractTask.wait();
468: }
469: } catch (InterruptedException e) {
470: break;
471: }
472: else
473: break;
474: } while (true);
475: } while (logIdx != -1); // while we have not reached the last querys
476:
477: return logIdx;
478: }
479:
480: /**
481: * Wait for all tasks in the given list to complete. Note that endRecovery()
482: * is called upon failure.
483: *
484: * @param pendingRecoveryTasks list of <code>RecoveryTask</code> currently
485: * executing tasks
486: * @throws SQLException if a failure occurs
487: */
488: private void waitForAllTasksCompletion(
489: LinkedList pendingRecoveryTasks) throws SQLException {
490: RecoveryTask recoveryTask;
491: AbstractTask abstractTask;
492:
493: while (!pendingRecoveryTasks.isEmpty()) {
494: recoveryTask = (RecoveryTask) pendingRecoveryTasks
495: .removeFirst();
496: abstractTask = recoveryTask.getTask();
497: synchronized (abstractTask) {
498: // Wait for task completion if needed
499: while (!abstractTask.hasFullyCompleted())
500: try {
501: abstractTask.wait();
502: } catch (InterruptedException ignore) {
503: }
504:
505: if (LogEntry.SUCCESS.equals(recoveryTask.getStatus())) { // Only deal with successful tasks
506: if (abstractTask.getFailed() > 0) { // We fail to recover that task. Signal end of recovery and kill
507: // worker thread
508: recoveryLog.endRecovery();
509: addWorkerTask(new KillThreadTask(1, 1));
510: pendingRecoveryTasks.clear();
511: String msg;
512: if (abstractTask.isAutoCommit())
513: msg = Translate.get(
514: "recovery.failed.with.error",
515: new Object[] {
516: abstractTask,
517: ((Exception) abstractTask
518: .getExceptions()
519: .get(0))
520: .getMessage() });
521: else
522: msg = Translate
523: .get(
524: "recovery.failed.with.error.transaction",
525: new Object[] {
526: Long
527: .toString(abstractTask
528: .getTransactionId()),
529: abstractTask,
530: ((Exception) abstractTask
531: .getExceptions()
532: .get(0))
533: .getMessage() });
534: logger.error(msg);
535: throw new SQLException(msg);
536: }
537: }
538: }
539: }
540: }
541:
542: /**
543: * Add a task to a DatabaseBackend using the proper synchronization.
544: *
545: * @param task the task to add to the thread queue
546: */
547: private void addWorkerTask(AbstractTask task) {
548: backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task,
549: recoveryBatchSize);
550: }
551:
552: /**
553: * Properly end the recovery and kill the worker thread used for recovery if
554: * it exists.
555: *
556: * @see #startRecovery()
557: */
558: private void endRecovery() {
559: // We are done with the recovery
560: logger.info(Translate.get("recovery.process.complete"));
561: backend.terminateWorkerThreads();
562:
563: recoveryLog.endRecovery();
564: }
565:
566: /**
567: * Start the recovery process by forking a BackendWorkerThread. <br />
568: * You must call endRecovery() to terminate the thread.
569: * <p>
570: * when starting the recovery, we create a new BackendTaskQueues for the
571: * backend but only its non-conflicting queue will be used.<br />
572: * We also use only one BackendWorkerThread to ensure that the request will be
573: * replayed serially in the same order they were logged.
574: * </p>
575: * <p>
576: * A new BackendTaskQueues will be set on the backend when it is enabled in
577: * the endRecovery() method.
578: * </p>
579: *
580: * @see #endRecovery()
581: * @see #addWorkerTask(AbstractTask)
582: */
583: private void startRecovery() {
584: try {
585: ObjectName taskQueuesObjectName = JmxConstants
586: .getBackendTaskQueuesObjectName(backend
587: .getVirtualDatabaseName(), backend
588: .getName());
589: if (MBeanServerManager.getInstance().isRegistered(
590: taskQueuesObjectName)) {
591: MBeanServerManager.unregister(JmxConstants
592: .getBackendTaskQueuesObjectName(backend
593: .getVirtualDatabaseName(), backend
594: .getName()));
595: }
596: } catch (Exception e) {
597: if (logger.isWarnEnabled()) {
598: logger
599: .warn(
600: "Exception while unregistering backend task queues mbean",
601: e);
602: }
603: }
604: // find the correct enforceTableLocking option for this backend
605: boolean enforceTableLocking = requestManager.getLoadBalancer().waitForCompletionPolicy
606: .isEnforceTableLocking();
607: backend.setTaskQueues(new BackendTaskQueues(backend,
608: new WaitForCompletionPolicy(
609: WaitForCompletionPolicy.FIRST,
610: enforceTableLocking, 0), requestManager));
611: backend.startWorkerThreads(requestManager.getLoadBalancer());
612: }
613:
614: /*
615: * Used to signal that we have reached the end of the recovery log during the
616: * recovery process. There are other conditions that interrupt the recovery
617: * process, such as finding a request which is still executing. In such case
618: * we will not throw this exception.
619: */
620: private class EndOfRecoveryLogException extends Exception {
621: private static final long serialVersionUID = 2826202288239306426L;
622: private long logIdx;
623:
624: /**
625: * Creates a new <code>EndOfRecoveryLogException</code> object
626: *
627: * @param logIdx recovery log index we stopped at
628: */
629: public EndOfRecoveryLogException(long logIdx) {
630: this .logIdx = logIdx;
631: }
632:
633: /**
634: * Return the last recovery log index reached
635: *
636: * @return last recovery log index
637: */
638: public long getLogIdx() {
639: return logIdx;
640: }
641: }
642:
643: }
|