Source Code Cross Referenced for RecoverThread.java in  » Database-JDBC-Connection-Pool » sequoia-2.10.9 » org » continuent » sequoia » controller » recoverylog » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » sequoia 2.10.9 » org.continuent.sequoia.controller.recoverylog 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * Sequoia: Database clustering technology.
003:         * Copyright (C) 2002-2004 French National Institute For Research In Computer
004:         * Science And Control (INRIA).
005:         * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
006:         * Copyright (C) 2005-2006 Continuent, Inc.
007:         * Contact: sequoia@continuent.org
008:         * 
009:         * Licensed under the Apache License, Version 2.0 (the "License");
010:         * you may not use this file except in compliance with the License.
011:         * You may obtain a copy of the License at
012:         * 
013:         * http://www.apache.org/licenses/LICENSE-2.0
014:         * 
015:         * Unless required by applicable law or agreed to in writing, software
016:         * distributed under the License is distributed on an "AS IS" BASIS,
017:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018:         * See the License for the specific language governing permissions and
019:         * limitations under the License. 
020:         *
021:         * Initial developer(s): Nicolas Modrzyk.
022:         * Contributor(s): Emmanuel Cecchet.
023:         */package org.continuent.sequoia.controller.recoverylog;
024:
025:        import java.sql.SQLException;
026:        import java.util.ArrayList;
027:        import java.util.HashMap;
028:        import java.util.Iterator;
029:        import java.util.LinkedList;
030:        import java.util.List;
031:
032:        import javax.management.ObjectName;
033:
034:        import org.continuent.sequoia.common.i18n.Translate;
035:        import org.continuent.sequoia.common.jmx.JmxConstants;
036:        import org.continuent.sequoia.common.jmx.management.BackendState;
037:        import org.continuent.sequoia.common.jmx.notifications.SequoiaNotificationList;
038:        import org.continuent.sequoia.common.log.Trace;
039:        import org.continuent.sequoia.controller.backend.DatabaseBackend;
040:        import org.continuent.sequoia.controller.jmx.MBeanServerManager;
041:        import org.continuent.sequoia.controller.loadbalancer.BackendTaskQueues;
042:        import org.continuent.sequoia.controller.loadbalancer.policies.WaitForCompletionPolicy;
043:        import org.continuent.sequoia.controller.loadbalancer.tasks.AbstractTask;
044:        import org.continuent.sequoia.controller.loadbalancer.tasks.BeginTask;
045:        import org.continuent.sequoia.controller.loadbalancer.tasks.ClosePersistentConnectionTask;
046:        import org.continuent.sequoia.controller.loadbalancer.tasks.CommitTask;
047:        import org.continuent.sequoia.controller.loadbalancer.tasks.KillThreadTask;
048:        import org.continuent.sequoia.controller.loadbalancer.tasks.OpenPersistentConnectionTask;
049:        import org.continuent.sequoia.controller.loadbalancer.tasks.RollbackTask;
050:        import org.continuent.sequoia.controller.recoverylog.events.LogEntry;
051:        import org.continuent.sequoia.controller.requestmanager.RequestManager;
052:        import org.continuent.sequoia.controller.requests.AbstractRequest;
053:        import org.continuent.sequoia.controller.requests.StoredProcedure;
054:        import org.continuent.sequoia.controller.scheduler.AbstractScheduler;
055:
056:        /**
057:         * This class defines a RecoverThread that is in charge of replaying the
058:         * recovery log on a given backend to re-synchronize it with the other nodes of
059:         * the cluster.
060:         * 
061:         * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
062:         * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
063:         * @version 1.0
064:         */
065:        public class RecoverThread extends Thread {
066:            static Trace logger = Trace
067:                    .getLogger(RecoverThread.class.getName());
068:            /** end user logger */
069:            static Trace endUserLogger = Trace
070:                    .getLogger("org.continuent.sequoia.enduser");
071:            private RecoveryLog recoveryLog;
072:            private DatabaseBackend backend;
073:            private RequestManager requestManager;
074:
075:            // an exception thrown while recovering
076:            private SQLException exception;
077:
078:            /**
079:             * a List&lt;Long&gt; of persistent connection IDs that are re-opened during
080:             * recovery
081:             */
082:            private List persistentConnections;
083:
084:            /**
085:             * HashMap of transaction IDs which are replayed during recovery (key is
086:             * transaction id, value is login)
087:             */
088:            private HashMap tids;
089:
090:            /**
091:             * The scheduler used to suspend writes during the recovery process
092:             */
093:            private AbstractScheduler scheduler;
094:
095:            private String checkpointName;
096:
097:            /** Size of the pendingRecoveryTasks queue used during recovery */
098:            private int recoveryBatchSize;
099:
100:            /**
101:             * Creates a new <code>RecoverThread</code> object
102:             * 
103:             * @param scheduler the currently used scheduler
104:             * @param recoveryLog Recovery log that creates this thread
105:             * @param backend database backend for logging
106:             * @param requestManager the request manager to use for recovery
107:             * @param checkpointName the checkpoint from which is started the recovery
108:             */
109:            public RecoverThread(AbstractScheduler scheduler,
110:                    RecoveryLog recoveryLog, DatabaseBackend backend,
111:                    RequestManager requestManager, String checkpointName) {
112:                super ("RecoverThread for backend " + backend.getName());
113:                this .scheduler = scheduler;
114:                this .recoveryLog = recoveryLog;
115:                this .backend = backend;
116:                this .requestManager = requestManager;
117:                this .checkpointName = checkpointName;
118:                this .recoveryBatchSize = recoveryLog.getRecoveryBatchSize();
119:                tids = new HashMap();
120:                persistentConnections = new ArrayList();
121:            }
122:
123:            /**
124:             * Returns the exception value.
125:             * 
126:             * @return Returns the exception.
127:             */
128:            public SQLException getException() {
129:                return exception;
130:            }
131:
132:            /**
133:             * @see java.lang.Runnable#run()
134:             */
135:            public void run() {
136:                backend.setState(BackendState.REPLAYING);
137:                try {
138:                    if (!backend.isInitialized())
139:                        backend.initializeConnections();
140:                } catch (SQLException e) {
141:                    recoveryFailed(e);
142:                    return;
143:                }
144:                // notify the recovery log that a new
145:                // recovery is about to begin
146:                recoveryLog.beginRecovery();
147:
148:                // Get the checkpoint from the recovery log
149:                long logIdx;
150:                try {
151:                    logIdx = recoveryLog.getCheckpointLogId(checkpointName);
152:                } catch (SQLException e) {
153:                    recoveryLog.endRecovery();
154:                    String msg = Translate.get(
155:                            "recovery.cannot.get.checkpoint", e);
156:                    logger.error(msg, e);
157:                    recoveryFailed(new SQLException(msg));
158:                    return;
159:                }
160:
161:                try {
162:                    startRecovery();
163:
164:                    logger.info(Translate.get("recovery.start.process"));
165:
166:                    // Play write queries from the recovery log until the last entry or the
167:                    // first executing entry
168:                    LinkedList pendingRecoveryTasks = new LinkedList();
169:                    try {
170:                        logIdx = recover(logIdx, pendingRecoveryTasks);
171:                    } catch (EndOfRecoveryLogException e) {
172:                        logIdx = e.getLogIdx();
173:                    }
174:
175:                    requestManager.suspendActivity();
176:
177:                    /*
178:                     * We need to make sure that the logger thread queue has been flushed to
179:                     * the database, so we need a synchronization event that will make sure
180:                     * that this happens. The getCheckpointLogId method posts a new object in
181:                     * the logger thread queue and waits for it to be processed before
182:                     * returning the result. We don't care about the result that is even
183:                     * supposed to throw an exception but at least we are sure that the whole
184:                     * queue has been flushed to disk.
185:                     */
186:                    try {
187:                        recoveryLog
188:                                .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ...");
189:                    } catch (SQLException ignore) {
190:                    }
191:
192:                    // Play the remaining writes that were pending and which have been logged
193:                    boolean replayedAllLog = false;
194:                    do { // Loop until the whole recovery log has been replayed
195:                        try {
196:                            logIdx = recover(logIdx, pendingRecoveryTasks);
197:                            // The status update for the last request (probably a commit/rollback)
198:                            // is not be there yet. Wait for it to be flushed to the log and
199:                            // retry.
200:                            try {
201:                                recoveryLog
202:                                        .getCheckpointLogId("Just a big hack to synchronize the logger thread queue. Expected to fail ...");
203:                            } catch (SQLException ignore) {
204:                            }
205:                        } catch (EndOfRecoveryLogException e) {
206:                            replayedAllLog = true;
207:                        }
208:                    } while (!replayedAllLog);
209:                    waitForAllTasksCompletion(pendingRecoveryTasks);
210:                } catch (SQLException e) {
211:                    recoveryFailed(e);
212:                    // Resume writes, transactions and persistent connections
213:                    requestManager.resumeActivity();
214:                    return;
215:                } finally {
216:                    endRecovery();
217:                }
218:
219:                // Now enable it
220:                try {
221:                    requestManager.getLoadBalancer().enableBackend(backend,
222:                            true);
223:                } catch (SQLException e) {
224:                    recoveryFailed(e);
225:                    return;
226:                } finally {
227:                    // Resume writes, transactions and persistent connections
228:                    requestManager.resumeActivity();
229:                }
230:                logger.info(Translate.get("backend.state.enabled", backend
231:                        .getName()));
232:            }
233:
234:            /**
235:             * Unset the last known checkpoint and set the backend to disabled state. This
236:             * should be called when the recovery has failed.
237:             * 
238:             * @param e cause of the recovery failure
239:             */
240:            private void recoveryFailed(SQLException e) {
241:                this .exception = e;
242:
243:                if (scheduler.isSuspendedWrites())
244:                    scheduler.resumeWrites();
245:
246:                backend.setLastKnownCheckpoint(null);
247:                backend.setState(BackendState.DISABLED);
248:                try {
249:                    backend.finalizeConnections();
250:                } catch (SQLException ignore) {
251:                }
252:                backend
253:                        .notifyJmxError(
254:                                SequoiaNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED,
255:                                e);
256:            }
257:
258:            /**
259:             * Replay the recovery log from the given logIdx index. Note that
260:             * startRecovery() must have been called to fork and start the
261:             * BackendWorkerThread before calling recover. endRecovery() must be called
262:             * after recover() to terminate the thread.
263:             * 
264:             * @param logIdx logIdx used to start the recovery
265:             * @param pendingRecoveryTasks
266:             * @return last logIdx that was replayed.
267:             * @throws SQLException if fails
268:             * @see #startRecovery()
269:             * @see #endRecovery()
270:             */
271:            private long recover(long logIdx, LinkedList pendingRecoveryTasks)
272:                    throws SQLException, EndOfRecoveryLogException {
273:                RecoveryTask recoveryTask = null;
274:                AbstractTask abstractTask = null;
275:
276:                Long tid = null;
277:                long previousRemaining = 0;
278:                // Replay the whole log
279:                do {
280:                    try {
281:                        recoveryTask = recoveryLog.recoverNextRequest(logIdx,
282:                                scheduler);
283:                    } catch (SQLException e) {
284:                        // Signal end of recovery and kill worker thread
285:                        recoveryLog.endRecovery();
286:                        addWorkerTask(new KillThreadTask(1, 1));
287:                        String msg = Translate.get(
288:                                "recovery.cannot.recover.from.index", e);
289:                        logger.error(msg, e);
290:                        throw new SQLException(msg);
291:                    }
292:                    if (recoveryTask == null)
293:                        throw new EndOfRecoveryLogException(logIdx);
294:
295:                    abstractTask = recoveryTask.getTask();
296:                    if (abstractTask == null)
297:                        throw new SQLException(
298:                                "Unexpected null abstract task in recovery task "
299:                                        + recoveryTask);
300:
301:                    if (LogEntry.EXECUTING.equals(recoveryTask.getStatus())) {
302:                        // Ok, wait for current tasks to complete and notify the recovery that
303:                        // we stopped on this entry
304:                        break;
305:                    }
306:
307:                    if (!LogEntry.SUCCESS.equals(recoveryTask.getStatus())) { // Ignore failed queries unless they are stored procedures that could
308:                        // have some side effect
309:                        if (!(abstractTask.getRequest() instanceof  StoredProcedure)) {
310:                            logIdx++;
311:                            continue;
312:                        }
313:                    }
314:                    if ((logIdx % 1000) == 0) {
315:                        long remaining = recoveryLog.getCurrentLogId() - logIdx;
316:                        endUserLogger.info("Recovering log entry " + logIdx
317:                                + " remaining entries " + remaining);
318:                        if (previousRemaining > 0
319:                                && remaining > previousRemaining) {
320:                            endUserLogger
321:                                    .warn("Recovery falling behind pending requests ="
322:                                            + pendingRecoveryTasks.size());
323:                        }
324:                        previousRemaining = remaining;
325:                    }
326:                    if (abstractTask.isPersistentConnection()) {
327:                        long cid = abstractTask.getPersistentConnectionId();
328:                        if (abstractTask instanceof  OpenPersistentConnectionTask)
329:                            persistentConnections.add(new Long(cid));
330:                        else if (abstractTask instanceof  ClosePersistentConnectionTask)
331:                            persistentConnections.remove(new Long(cid));
332:                        else if (!persistentConnections.contains(new Long(cid))) {
333:                            /**
334:                             * If the task persistent connection id does not have a corresponding
335:                             * connection opening (it is not in the persistent connections list),
336:                             * then this task has already been played when the backend was
337:                             * disabled. So we can skip it.
338:                             * <p>
339:                             * Note that if the task is a BeginTask, skipping the begin will skip
340:                             * all requests in the transaction which is the expected behavior on a
341:                             * persistent connection (transaction has been played before the
342:                             * connection was closed, i.e. the backend was disabled).
343:                             */
344:                            logIdx++;
345:                            continue;
346:                        }
347:                    }
348:
349:                    // Used to retrieve login and persistent connection id
350:                    AbstractRequest request = null;
351:                    if (!abstractTask.isAutoCommit()) {
352:                        tid = new Long(recoveryTask.getTid());
353:                        if (abstractTask instanceof  BeginTask) {
354:                            if (tids.containsKey(tid)) {
355:                                // Skip multiple begins of the same transaction if exists (this is
356:                                // possible !!!)
357:                                logIdx++;
358:                                continue;
359:                            }
360:                            tids.put(tid, abstractTask.getRequest());
361:                        } else {
362:                            request = (AbstractRequest) tids.get(tid);
363:                            if (request == null) {
364:                                /*
365:                                 * if the task transaction id does not have a corresponding begin
366:                                 * (it is not in the tids list), then this task has already been
367:                                 * played when the backend was disabled. So we can skip it.
368:                                 */
369:                                logIdx++;
370:                                continue;
371:                            }
372:                            if (abstractTask instanceof  RollbackTask) {
373:                                // Override login in case it was logged with UNKNOWN_USER
374:                                ((RollbackTask) abstractTask)
375:                                        .getTransactionMetaData().setLogin(
376:                                                request.getLogin());
377:                            }
378:                            // Restore persistent connection id information
379:                            abstractTask.setPersistentConnection(request
380:                                    .isPersistentConnection());
381:                            abstractTask.setPersistentConnectionId(request
382:                                    .getPersistentConnectionId());
383:                        }
384:                    } // else autocommit ok
385:
386:                    if ((abstractTask instanceof  CommitTask)
387:                            || (abstractTask instanceof  RollbackTask)) {
388:                        tids.remove(tid);
389:                    }
390:
391:                    logIdx = recoveryTask.getId();
392:                    // Add the task for execution by the BackendWorkerThread
393:                    addWorkerTask(abstractTask);
394:
395:                    // Add it to the list of currently executing tasks
396:                    pendingRecoveryTasks.addLast(recoveryTask);
397:
398:                    do {
399:                        // Now let's check which tasks have completed and remove them from the
400:                        // pending queue.
401:                        for (Iterator iter = pendingRecoveryTasks.iterator(); iter
402:                                .hasNext();) {
403:                            recoveryTask = (RecoveryTask) iter.next();
404:                            abstractTask = recoveryTask.getTask();
405:                            if (abstractTask.hasFullyCompleted()) { // Task has completed, remove it from the list
406:                                iter.remove();
407:
408:                                if (LogEntry.SUCCESS.equals(recoveryTask
409:                                        .getStatus())) { // Only deal with successful tasks
410:
411:                                    if (abstractTask.getFailed() > 0) { // We fail to recover that task. Signal end of recovery and kill
412:                                        // worker thread
413:                                        String msg;
414:                                        if (abstractTask.isAutoCommit())
415:                                            msg = Translate
416:                                                    .get(
417:                                                            "recovery.failed.with.error",
418:                                                            new Object[] {
419:                                                                    abstractTask,
420:                                                                    ((Exception) abstractTask
421:                                                                            .getExceptions()
422:                                                                            .get(
423:                                                                                    0))
424:                                                                            .getMessage() });
425:                                        else
426:                                            msg = Translate
427:                                                    .get(
428:                                                            "recovery.failed.with.error.transaction",
429:                                                            new Object[] {
430:                                                                    Long
431:                                                                            .toString(abstractTask
432:                                                                                    .getTransactionId()),
433:                                                                    abstractTask,
434:                                                                    ((Exception) abstractTask
435:                                                                            .getExceptions()
436:                                                                            .get(
437:                                                                                    0))
438:                                                                            .getMessage() });
439:                                        recoveryLog.endRecovery();
440:                                        addWorkerTask(new KillThreadTask(1, 1));
441:                                        pendingRecoveryTasks.clear();
442:                                        logger.error(msg);
443:                                        throw new SQLException(msg);
444:                                    }
445:                                }
446:                            }
447:                        }
448:
449:                        /*
450:                         * Transactions and persistentConnections limit by the 
451:                         * number of pending requests at the backend total order 
452:                         * queue. Only one request per transaction or persistent
453:                         * connection will be move from the total order queue
454:                         * to the task queues. When all the requests are auto commit,
455:                         * we need to limit the number of requests here. Otherwise,
456:                         * the conflicting queue can grow indefinitely large.
457:                         */
458:                        if (tids.isEmpty()
459:                                && persistentConnections.isEmpty()
460:                                && pendingRecoveryTasks.size() >= recoveryBatchSize)
461:                            try {
462:                                recoveryTask = (RecoveryTask) pendingRecoveryTasks
463:                                        .getFirst();
464:                                abstractTask = recoveryTask.getTask();
465:                                synchronized (abstractTask) {
466:                                    if (!abstractTask.hasFullyCompleted())
467:                                        abstractTask.wait();
468:                                }
469:                            } catch (InterruptedException e) {
470:                                break;
471:                            }
472:                        else
473:                            break;
474:                    } while (true);
475:                } while (logIdx != -1); // while we have not reached the last querys
476:
477:                return logIdx;
478:            }
479:
480:            /**
481:             * Wait for all tasks in the given list to complete. Note that endRecovery()
482:             * is called upon failure.
483:             * 
484:             * @param pendingRecoveryTasks list of <code>RecoveryTask</code> currently
485:             *          executing tasks
486:             * @throws SQLException if a failure occurs
487:             */
488:            private void waitForAllTasksCompletion(
489:                    LinkedList pendingRecoveryTasks) throws SQLException {
490:                RecoveryTask recoveryTask;
491:                AbstractTask abstractTask;
492:
493:                while (!pendingRecoveryTasks.isEmpty()) {
494:                    recoveryTask = (RecoveryTask) pendingRecoveryTasks
495:                            .removeFirst();
496:                    abstractTask = recoveryTask.getTask();
497:                    synchronized (abstractTask) {
498:                        // Wait for task completion if needed
499:                        while (!abstractTask.hasFullyCompleted())
500:                            try {
501:                                abstractTask.wait();
502:                            } catch (InterruptedException ignore) {
503:                            }
504:
505:                        if (LogEntry.SUCCESS.equals(recoveryTask.getStatus())) { // Only deal with successful tasks
506:                            if (abstractTask.getFailed() > 0) { // We fail to recover that task. Signal end of recovery and kill
507:                                // worker thread
508:                                recoveryLog.endRecovery();
509:                                addWorkerTask(new KillThreadTask(1, 1));
510:                                pendingRecoveryTasks.clear();
511:                                String msg;
512:                                if (abstractTask.isAutoCommit())
513:                                    msg = Translate.get(
514:                                            "recovery.failed.with.error",
515:                                            new Object[] {
516:                                                    abstractTask,
517:                                                    ((Exception) abstractTask
518:                                                            .getExceptions()
519:                                                            .get(0))
520:                                                            .getMessage() });
521:                                else
522:                                    msg = Translate
523:                                            .get(
524:                                                    "recovery.failed.with.error.transaction",
525:                                                    new Object[] {
526:                                                            Long
527:                                                                    .toString(abstractTask
528:                                                                            .getTransactionId()),
529:                                                            abstractTask,
530:                                                            ((Exception) abstractTask
531:                                                                    .getExceptions()
532:                                                                    .get(0))
533:                                                                    .getMessage() });
534:                                logger.error(msg);
535:                                throw new SQLException(msg);
536:                            }
537:                        }
538:                    }
539:                }
540:            }
541:
542:            /**
543:             * Add a task to a DatabaseBackend using the proper synchronization.
544:             * 
545:             * @param task the task to add to the thread queue
546:             */
547:            private void addWorkerTask(AbstractTask task) {
548:                backend.getTaskQueues().addTaskToBackendTotalOrderQueue(task,
549:                        recoveryBatchSize);
550:            }
551:
552:            /**
553:             * Properly end the recovery and kill the worker thread used for recovery if
554:             * it exists.
555:             * 
556:             * @see #startRecovery()
557:             */
558:            private void endRecovery() {
559:                // We are done with the recovery
560:                logger.info(Translate.get("recovery.process.complete"));
561:                backend.terminateWorkerThreads();
562:
563:                recoveryLog.endRecovery();
564:            }
565:
566:            /**
567:             * Start the recovery process by forking a BackendWorkerThread. <br />
568:             * You must call endRecovery() to terminate the thread.
569:             * <p>
570:             * when starting the recovery, we create a new BackendTaskQueues for the
571:             * backend but only its non-conflicting queue will be used.<br />
572:             * We also use only one BackendWorkerThread to ensure that the request will be
573:             * replayed serially in the same order they were logged.
574:             * </p>
575:             * <p>
576:             * A new BackendTaskQueues will be set on the backend when it is enabled in
577:             * the endRecovery() method.
578:             * </p>
579:             * 
580:             * @see #endRecovery()
581:             * @see #addWorkerTask(AbstractTask)
582:             */
583:            private void startRecovery() {
584:                try {
585:                    ObjectName taskQueuesObjectName = JmxConstants
586:                            .getBackendTaskQueuesObjectName(backend
587:                                    .getVirtualDatabaseName(), backend
588:                                    .getName());
589:                    if (MBeanServerManager.getInstance().isRegistered(
590:                            taskQueuesObjectName)) {
591:                        MBeanServerManager.unregister(JmxConstants
592:                                .getBackendTaskQueuesObjectName(backend
593:                                        .getVirtualDatabaseName(), backend
594:                                        .getName()));
595:                    }
596:                } catch (Exception e) {
597:                    if (logger.isWarnEnabled()) {
598:                        logger
599:                                .warn(
600:                                        "Exception while unregistering backend task queues mbean",
601:                                        e);
602:                    }
603:                }
604:                // find the correct enforceTableLocking option for this backend
605:                boolean enforceTableLocking = requestManager.getLoadBalancer().waitForCompletionPolicy
606:                        .isEnforceTableLocking();
607:                backend.setTaskQueues(new BackendTaskQueues(backend,
608:                        new WaitForCompletionPolicy(
609:                                WaitForCompletionPolicy.FIRST,
610:                                enforceTableLocking, 0), requestManager));
611:                backend.startWorkerThreads(requestManager.getLoadBalancer());
612:            }
613:
614:            /*
615:             * Used to signal that we have reached the end of the recovery log during the
616:             * recovery process. There are other conditions that interrupt the recovery
617:             * process, such as finding a request which is still executing. In such case
618:             * we will not throw this exception.
619:             */
620:            private class EndOfRecoveryLogException extends Exception {
621:                private static final long serialVersionUID = 2826202288239306426L;
622:                private long logIdx;
623:
624:                /**
625:                 * Creates a new <code>EndOfRecoveryLogException</code> object
626:                 * 
627:                 * @param logIdx recovery log index we stopped at
628:                 */
629:                public EndOfRecoveryLogException(long logIdx) {
630:                    this .logIdx = logIdx;
631:                }
632:
633:                /**
634:                 * Return the last recovery log index reached
635:                 * 
636:                 * @return last recovery log index
637:                 */
638:                public long getLogIdx() {
639:                    return logIdx;
640:                }
641:            }
642:
643:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.